Status
...
Page properties | |
---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
...
JIRA: tbd
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>(); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that completed (successfully or unsuccessfully) are * automatically decrementing the counter. Any request entry that was not * successfully persisted needs to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private int inFlightRequestsCount; @Override public void write(InputT element, Context context) throws IOException, InterruptedException { // blocks if too many elements have been buffered while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) { mailboxExecutor.yield(); } bufferedRequestEntries.add(elementConverter.apply(element, context)); // blocks if too many async requests are in flight flush(); } /** * Persists buffered RequestsEntries into the destination by invoking {@code * submitRequestEntries} with batches according to the user specified * buffering hints. * * The method blocks if too many async requests are in flight. */ private void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the destination ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE); while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) { try { batch.add(bufferedRequestEntries.remove()); } catch (NoSuchElementException e) { // if there are not enough elements, just create a smaller batch break; } } ResultFuture<RequestEntryT> requestResult = failedRequestEntries -> mailboxExecutor.execute( () -> completeRequest(failedRequestEntries), "Mark in-flight request as completed and requeue %d request entries", failedRequestEntries.size()); while (inFlightRequestsCount >= MAX_IN_FLIGHT_REQUESTS) { mailboxExecutor.yield(); } inFlightRequestsCount++; submitRequestEntries(batch, requestResult); } } /** * Marks an in-flight request as completed and prepends failed requestEntries back to the * internal requestEntry buffer for later retry. * * @param failedRequestEntries requestEntries that need to be retried */ private void completeRequest(Collection<RequestEntryT> failedRequestEntries) { inFlightRequestsCount--; // By just iterating through failedRequestEntries, it reverses the order of the // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api // does not make any order guarantees, but may cause avoidable reorderings for other // destinations. failedRequestEntries.forEach(bufferedRequestEntries::addFirst); } /** * In flight requests will be retried if the sink is still healthy. But if in-flight requests * in-flight requests* fail after a checkpoint has been triggered and Flink * needs to recover from the checkpoint, * the (failed) in-flight requests are * gone and cannot be retried. Hence, there cannot be any outstanding * outstanding in-flight requests when a commit is initialized. * <p> * To<p>To this end, all in-flight requests need to be passed to the {@code * AsyncSinkCommiter} in order to be completed asbefore partproceeding ofwith the pre commit. */ @Override public List<AtomicLong>List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException { if (flush) { flush(); } // wait until all in-flight requests completed while (inFlightRequestsCount > 0) { mailboxExecutor.yield(); } return Collections.emptyList(); } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...