Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final BlockingDeque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>(...);


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully)
     * are automatically removed from the queue. Any request entry that was not
     * successfully persisted needneeds to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...)AtomicLong numberOfInFlightRequests;


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {
        // blocks if too many events have been buffered
        bufferedRequestEntries.putLast(elementConverter.apply(element, context));

        // blocks if too many async requests are in flight
        if (/*buffering hints are met*/) {
			flush();
		}
    }


    /**
     * The entire request may fail or single request entries that are part of
     * the request may not be persisted successfully, eg, because of network
     * issues or service side throttling. All request entries that failed with
     * transient failures need to be re-queued with this method so that aren't
     * lost and can be retried later.
     * <p>
     * Request entries that are causing the same error in a reproducible manner,
     * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handled in the logic of {@code submitRequestEntries}. Otherwise
     * these request entries will be retried indefinitely, always causing the
     * same error.
     */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {
        bufferedRequestEntries.putFirst(requestEntry);
    }


    /**
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
     * buffering hints.
     *
     * The method blocks if too many async requests are in flight.
     */
    private void flush() throws InterruptedException {
        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
            // create a batch of request entries that should be persisted in the destination
            ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);

            for (int i = 0; i < MAX_BATCH_SIZE; i++) {
                try {
                    batch.add(bufferedRequestEntries.remove());
                } catch (NoSuchElementException e) {
                    // if there are not enough elements, just create a smaller batch
                    break;
                }
            }

			ResultFuture<?> requestResult = ...

         // call the destination specific code that actually persists the request entries
if (numberOfInFlightRequests.getAndIncrement() >= MAX_IN_FLIGHT_REQUESTS) {
                CompletableFuture<?> future = submitRequestEntries(batch);
// block and wait until enough in-fligh requests have completed
            }

			// keep track of in flight requestcall the destination specific code that actually persists the request entries
            inFlightRequests.put(futuresubmitRequestEntries(batch, requestResult);

    }


    /**
    // remove* theIn requestflight fromrequests thewill trackingbe queueretried onceif itthe competed
sink is           future.whenComplete((response, err) -> {still healthy. But if
                inFlightRequests.remove(future);
            });
        }
    }


    /**
     * In in-flight requests may fail, butafter theya willcheckpoint behas retriedbeen iftriggered theand sink isFlink
     * still healthy.
     * <p> needs to recover from the checkpoint, the (failed) in-flight requests are
     * gone Toand notcannot losebe anyretried. requestsHence, there cannot be any outstanding in-flight
     * in-flight requests when a commit is initialized.
     * <p>
     * To this end, all in-flight this end, all in-flight requests need to be passed to the {@code
     * requests needAsyncSinkCommiter} in order to be completed as part of the pre commit.
     */
    @Override
    public List<Collection<CompletableFuture<?>>>List<AtomicLong> prepareCommit(boolean flush) throws IOException {

        // reuse current inFlightRequests as commitable and create an empty queue 
        //logger.info("Prepare commit. {} requests currently in flight.", numberOfInFlightRequests.get());

        // reuse current inFlightRequests as commitable and create empty queue to avoid copy and clearing
        List<Collection<CompletableFuture<?>>>List<AtomicLong> committable = Collections.singletonList(inFlightRequestsnumberOfInFlightRequests);

        // all in-flight requests are handled by the AsyncSinkCommiter and new 
        // elements cannot be added to the queue during a commit, so it's save to 
        // create a new queuecounter
        inFlightRequestsnumberOfInFlightRequests = new LinkedBlockingDeque<>AtomicLong();

        return committable;
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<?> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = future
    re-queue elements of failed requests
         future.whenComplete((response, err) -> {
                if (response.failedRecordCount() > 0) {
                    List<PutRecordsResultEntry> records = response.records();

                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            requeueFailedRequest(requestEntries.get(i));
                        }
                    }
                }

                //TODO: handle errors of the entire request...
   requestResult.complete(Collections.emptyList());

         });

        //TODO: returnhandle errors futureof tothe track completion of async request
entire request...
           return handleResponse});
    }

    ...
}


Rejected Alternatives

...