Status
...
Page properties | |
---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
...
JIRA: tbd
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink.
The async sink is based on FLIP-143 . and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, AtomicLongVoid, Collection<RequestEntryT>> { /** * This method specifies how to persist buffered request entries into the * destination. It is implemented when support for a new destination is * added. * <p> * The method is invoked with a set of request entries according to the * buffering hints (and the valid limits of the destination). The logic then * needs to create and execute the request against the destination (ideally * by batching together multiple request entries to increase efficiency). * The logic also needs to identify individual request entries that were not * persisted successfully and resubmit them using the {@code * requeueFailedRequestEntryrequestResult} method. * <p> * During checkpointing, the sink needs to ensure that there are no * outstanding in-flight requests. * * @param requestEntries a set of request entries that should be sent to the * destination * @param requestResult a ResultFuture that needs to be completed once all * request entries that have been passed to the method * on invocation have either been successfully * persisted in the destination or have been * re-queued * @return a future that completes when all request entries have been * successfully persisted to the API or were re-queued */ protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<?>ResultFuture<RequestEntryT> requestResult); ... } |
Internally, the AsyncSinkWriter
buffers RequestEntryT
s and invokes the submitRequestEntries
method with a set of RequestEntryT
s according to user specified buffering hints. The AsyncSinkWriter
also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryT
s are persisted in the application state snapshot.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final BlockingDeque<RequestEntryT>Deque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>ArrayDeque<>(...); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that completed (successfully or unsuccessfully) are * automatically decrementing the counter. Any request entry that was not * successfully persisted needs to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private AtomicLongint numberOfInFlightRequestsinFlightRequestsCount; @Override public void write(InputT element, Context context) throws IOException, InterruptedException { // blocks if too many eventselements have been buffered while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) { mailboxExecutor.yield(); } bufferedRequestEntries.putLastadd(elementConverter.apply(element, context)); // blocks if too many async requests are in flight if (/*buffering hints are met*/) { flush(); } flush(); } /** * ThePersists buffered entireRequestsEntries requestinto maythe faildestination orby singleinvoking request{@code entries that are part of * submitRequestEntries} with batches according *to the requestuser mayspecified not be persisted successfully, eg,* because of networkbuffering hints. * * issues or service side throttling. All request entries that failed with The method blocks if too many async requests are in flight. */ private void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the destination ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE); while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) { try { * transient failures need to be re-queued with this method so that aren't batch.add(bufferedRequestEntries.remove()); * lost and can be retried later. * <p> } catch (NoSuchElementException e) { * Request entries that are causing the same error in a reproducible manner, * eg, ill-formed request entries, must // if there are not be re-queued but the error needsenough elements, just create a smaller batch * to be handled in the logic of {@code submitRequestEntries}. Otherwise break; * these request entries will be retried indefinitely, always causing the *} same error. */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException { } ResultFuture<RequestEntryT> bufferedRequestEntries.putFirst(requestEntry);requestResult = } /** * Persists buffered RequestsEntries into the destination byfailedRequestEntries invoking {@code-> mailboxExecutor.execute( * submitRequestEntries} with batches according to the user specified * buffering hints. () *-> completeRequest(failedRequestEntries), * The method blocks if too many async requests are in flight. */ private void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { "Mark in-flight request as completed and requeue %d request entries", // create a batch of request entries that should be persisted in the destination failedRequestEntries.size()); ArrayList<RequestEntryT>while batch(inFlightRequestsCount >= new ArrayList<>(MAX_IN_BATCHFLIGHT_SIZEREQUESTS); { for (int i = 0; i < MAX_BATCH_SIZE; i++) {mailboxExecutor.yield(); } try { inFlightRequestsCount++; batch.add(bufferedRequestEntries.remove()submitRequestEntries(batch, requestResult); } } }/** catch (NoSuchElementException e) { * Marks an in-flight request as completed and prepends failed requestEntries back to the * //internal ifrequestEntry therebuffer arefor not enough elements, just create a smaller batchlater retry. * * @param failedRequestEntries requestEntries that need to be retried break;*/ private void completeRequest(Collection<RequestEntryT> failedRequestEntries) { }inFlightRequestsCount--; // By just } ResultFuture<?> requestResult = ... iterating through failedRequestEntries, it reverses the order of the if// (numberOfInFlightRequestsfailedRequestEntries.getAndIncrement() >= MAX_IN_FLIGHT_REQUESTS) { It doesn't make a difference for kinesis:putRecords, as the api // blockdoes andnot waitmake untilany enough in-fligh requests have completed order guarantees, but may cause avoidable reorderings for other } // call the destination specific code that actually persists the request entriesdestinations. submitRequestEntries(batch, requestResultfailedRequestEntries.forEach(bufferedRequestEntries::addFirst); } /** * In flight requests will be retried if the sink is still healthy. But if in-flight requests * in-flight requests fail after a checkpoint has been triggered and Flink * needs to recover from the checkpoint, * the (failed) in-flight requests are * gone and cannot be retried. Hence, there cannot be any outstanding * outstanding in-flight requests when a commit is initialized. * <p> * To<p>To this end, all in-flight requests need to be passed to the {@code * AsyncSinkCommiter} in order to be completed asbefore partproceeding ofwith the pre commit. */ @Override public List<AtomicLong>List<Void> prepareCommit(boolean flush) throws IOException { logger.info("Prepare commit. {} requests currently in flight.", numberOfInFlightRequests.get()); // reuse current inFlightRequests as commitable and create empty queue to avoid copy and clearing, InterruptedException { if (flush) { List<AtomicLong> committable = Collections.singletonListflush(numberOfInFlightRequests); //} all in-flight requests are handled by the AsyncSinkCommiter// andwait newuntil elementsall cannotin-flight be added to the queue during a commit, so it's save to create a counterrequests completed while (inFlightRequestsCount > 0) { numberOfInFlightRequests = new AtomicLongmailboxExecutor.yield(); } return committableCollections.emptyList(); } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<?>ResultFuture<PutRecordsRequestEntry> requestResult) { // create a batch request PutRecordsRequest batchRequest = PutRecordsRequest .builder() .records(requestEntries) .streamName(streamName) .build(); // call api with batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); // re-queue elements of failed requests future.whenComplete((response, err) -> { if (response.failedRecordCount() > 0) { ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount() > 0) {); List<PutRecordsResultEntry> records = response.records(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { requeueFailedRequestfailedRequestEntries.add(requestEntries.get(i)); } } requestResult.complete(failedRequestEntries); } else { requestResult.complete(Collections.emptyList()); } //TODO: handle errors of the entire request... }); } ... } |
...