Status
...
Page properties | |
---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
JIRA: tbd
Released: tbd
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.
...
There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink.
The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> { /** * This method specifies how to persist buffered request entries into the * destination. It is implemented when support for a new destination is * added. * <p> * The method is invoked with a set of request entries according to the * buffering hints (and the valid limits of the destination). The logic then * needs to create and execute the request against the destination (ideally * by batching together multiple request entries to increase efficiency). * The logic also needs to identify individual request entries that were not * persisted successfully and resubmit them using the {@code * requeueFailedRequestEntryrequestResult} method. * <p> * The method returns a future During checkpointing, the sink needs to ensure that indicates,there once completed, that allare no * requestoutstanding entries that have been passed to the method on invocation have * either been successfully persisted in the destination or have beenin-flight requests. * * @param requestEntries a set of request entries that should be sent to the * re-queued. * <p> * During checkpointing, the sink needs to ensure that there are nodestination * outstanding in-flight requests. Ie,@param requestResult a ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall * method are completed. * * @param requestEntries a set ofrequest requestsentries that shouldhave bebeen sentpassed to the APImethod * endpointon invocation have either been successfully * @return a future that completes when all request entries persisted in the destination or have been * successfully persisted to the API or were re-queued */ protected abstract CompletableFuture<?>void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult); ... } |
Internally, the AsyncSinkWriter
buffers RequestEntryT
s and invokes the submitRequestEntries
method with a set of RequestEntryT
s according to user specified buffering hints. The AsyncSinkWriter
also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryT
s are persisted in the application state snapshot.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final BlockingDeque<RequestEntryT>Deque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>ArrayDeque<>(...); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that already completed (successfully or unsuccessfully) are * are automatically removeddecrementing from the queuecounter. Any request entry that was not * successfully persisted needneeds to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...)int inFlightRequestsCount; @Override public void write(InputT element, Context context) throws IOException, InterruptedException { // blocks if too many eventselements have been buffered while (bufferedRequestEntries.putLast(elementConverter.apply(element, context)); size() >= MAX_BUFFERED_REQUESTS_ENTRIES) { // blocks if too many async requests are in flight mailboxExecutor.yield(); } if (/*buffering hints are met*/) { bufferedRequestEntries.add(elementConverter.apply(element, context)); // blocks if too many async requests are in flight flush(); } } /** * ThePersists entirebuffered requestRequestsEntries mayinto failthe ordestination single request entries that are part of * the request may not be persisted successfully, eg, because of networkby invoking {@code * submitRequestEntries} with batches according to the user specified * buffering hints. * * issuesThe ormethod serviceblocks sideif throttling.too Allmany requestasync entriesrequests thatare failedin withflight. */ transient failures need to be re-queued with this method so that aren't * lost and can be retried later. private void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { * <p> // create a batch *of Requestrequest entries that should be arepersisted causingin the samedestination error in a reproducible manner, * eg, ill-formedArrayList<RequestEntryT> requestbatch entries,= must not be re-queued but the error needs new ArrayList<>(MAX_BATCH_SIZE); * to be handled in the logic of {@code submitRequestEntries}. Otherwise while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) { * these request entries will be retried indefinitely, always causingtry the{ * same error. */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException { batch.add(bufferedRequestEntries.remove()); bufferedRequestEntries.putFirst(requestEntry); } catch (NoSuchElementException e) /**{ * Persists buffered RequestsEntries into the destination by invoking {@code *// submitRequestEntries}if withthere batchesare accordingnot toenough theelements, userjust specified create a smaller batch * buffering hints. * * The method blocks if too manybreak; async requests are in flight. */ private void flush()} throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { } ResultFuture<RequestEntryT> requestResult = // create a batch of request entries that should be persisted in the destination failedRequestEntries -> mailboxExecutor.execute( ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE); for (int i = 0; i < MAX_BATCH_SIZE; i++) { () -> completeRequest(failedRequestEntries), try { "Mark in-flight request as completed and requeue %d batch.add(bufferedRequestEntries.remove());request entries", } catch (NoSuchElementException e) { failedRequestEntries.size()); //while if(inFlightRequestsCount there are not enough elements, just create a smaller batch>= MAX_IN_FLIGHT_REQUESTS) { breakmailboxExecutor.yield(); } } inFlightRequestsCount++; // call the destination specific code that actually persists the request entriessubmitRequestEntries(batch, requestResult); } } /** * Marks CompletableFuture<?> future = submitRequestEntries(batch); an in-flight request as completed and prepends failed requestEntries back to the //* keepinternal trackrequestEntry ofbuffer infor flightlater requestretry. * * inFlightRequests.put(future); @param failedRequestEntries requestEntries that need to be retried *// remove the request fromprivate thevoid tracking queue once it competedcompleteRequest(Collection<RequestEntryT> failedRequestEntries) { future.whenComplete((response, err) -> {inFlightRequestsCount--; // By just iterating through failedRequestEntries, it inFlightRequests.remove(future); reverses the order of the }); }// failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api } /** / does not make any * In-flight requests may failorder guarantees, but theymay willcause beavoidable retriedreorderings iffor theother sink is * still// healthydestinations. * <p> failedRequestEntries.forEach(bufferedRequestEntries::addFirst); } * To not lose/** any requests, there cannot be* anyIn outstanding in-flight requests will be retried *if requeststhe whensink ais commitstill is initializedhealthy. To this end, all But if in-flight requests * requestsfail needafter toa becheckpoint completedhas asbeen parttriggered ofand theFlink preneeds commit. to recover from the */checkpoint, @Override * the public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException { // reuse current inFlightRequests as commitable and create an empty queue (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any * outstanding in-flight requests when a commit is initialized. * // to avoid* copy<p>To andthis clearing end, all in-flight requests need to completed before List<Collection<CompletableFuture<?>>>proceeding committablewith =the Collectionscommit.singletonList(inFlightRequests); */ //@Override all in-flight requests arepublic handledList<Void> by the AsyncSinkCommiter and new prepareCommit(boolean flush) throws IOException, InterruptedException { // elements cannot be added to the queue during a commit, so it's save to if (flush) { flush(); } // create a new queuewait until all in-flight requests completed inFlightRequestswhile =(inFlightRequestsCount new> LinkedBlockingDeque<>(0); { return committablemailboxExecutor.yield(); } / return Collections.emptyList(); } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...
Appendix A – Simplified example implementation of a sink a sink for Kinesis Data Streams
Example ElementConverter for Kinesis Data Streams
This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry
that can be submitted to the Kinesis Data Streams API.
Code Block | ||||
---|---|---|---|---|
| ||||
new ElementConverter<InputT, PutRecordsRequestEntry>() {
@Override
public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
return PutRecordsRequestEntry
.builder()
.data(SdkBytes.fromUtf8String(element.toString()))
.partitionKey(String.valueOf(element.hashCode()))
.build();
}
} |
Simplified AsyncSinkWriter for Kinesis Data Streams
...
This is a simplified sample implementation of the AsyncSinkWriter
for Kinesis Data Streams
...
This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry
that can be submitted to the Kinesis Data Streams API. Given a set of buffered PutRecordRequestEntries
, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient
. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter
.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override new ElementConverter<InputT, PutRecordsRequestEntry>() { @Override public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) { return PutRecordsRequestEntry protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> .builder() .data(SdkBytes.fromUtf8String(element.toString()))requestResult) { // create a batch request .partitionKey(String.valueOf(element.hashCode())) PutRecordsRequest batchRequest = PutRecordsRequest .build(); } } |
Simplified AsyncSinkWriter for Kinesis Data Streams
This is a simplified sample implementation of the AsyncSinkWriter
for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries
, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient
. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter
.
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) { .builder() .records(requestEntries) .streamName(streamName) .build(); // createcall api awith batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); PutRecordsRequest batchRequest = PutRecordsRequest // re-queue elements of failed requests future.builder()whenComplete((response, err) -> { if (response.recordsfailedRecordCount(requestEntries) > 0) { .streamName(streamName) ArrayList<PutRecordsRequestEntry> failedRequestEntries = .build(); new ArrayList<>(response.failedRecordCount()); // call api with batch request CompletableFuture<PutRecordsResponse>List<PutRecordsResultEntry> futurerecords = clientresponse.putRecordsrecords(batchRequest); // re-queue elements of failed requests for CompletableFuture<PutRecordsResponse>(int handleResponsei = future 0; i < records.size(); i++) { .whenComplete((response, err) -> { if (response.failedRecordCountrecords.get(i).errorCode() >!= 0null) { List<PutRecordsResultEntry> records = response.records(failedRequestEntries.add(requestEntries.get(i)); for (int i = 0; i < records.size(); i++) { } if (records.get(i).errorCode() != null) { } requeueFailedRequest(requestEntriesrequestResult.getcomplete(ifailedRequestEntries)); } }else { }requestResult.complete(Collections.emptyList()); } //TODO: handle errors of the entire request... }); // return future to track completion of async request return handleResponse; } ... } |
Rejected Alternatives
...