Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final BlockingDeque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>(...);


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully)
     * are automatically removed from the queue. Any request entry that was not
     * successfully persisted need to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...);


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {        
        bufferedRequestEntries.putLast(elementConverter.apply(element, context));

		// for now, flush will check if enough events have been buffered
		flush();
    }


    /**
     * The entire request may fail or single request entries that are part of
     * the request may not be persisted successfully, eg, because of network
     * issues or service side throttling. All request entries that failed with
     * transient failures need to be re-queued with this method so that aren't
     * lost and can be retried later.
     * <p>
     * Request entries that are causing the same error in a reproducible manner,
     * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handled in the logic of {@code submitRequestEntries}. Otherwise
     * these request entries will be retried indefinitely, always causing the
     * same error.
     */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {
        bufferedRequestEntries.putFirst(requestEntry);
    }


    /**
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
     * buffering hints.
     */
    public void flush() throws InterruptedException {
        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
            // create a batch of request entries that should be persisted in the destination
            ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);

            for (int i = 0; i < MAX_BATCH_SIZE; i++) {
                try {
                    batch.add(bufferedRequestEntries.remove());
                } catch (NoSuchElementException e) {
                    // if there are not enough elements, just create a smaller batch
                    break;
                }
            }

            logger.info("submit requests for {} elements", batch.size());

            // call the destination specific code that actually persists the request entries
            CompletableFuture<?> future = submitRequestEntries(batch);

            // keep track of in flight request
            inFlightRequests.put(future);

            // remove the request from the tracking queue once it competed
            future.whenComplete((response, err) -> {
                inFlightRequests.remove(future);
            });
        }
    }


    /**
     * In-flight requests may fail, but they will be retried if the sink is
     * still healthy.
     * <p>
     * To not lose any requests, there cannot be any outstanding in-flight
     * requests when a commit is initialized. To this end, all in-flight
     * requests need to be completed as part of the pre commit.
     */
    @Override
    public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

        // reuse current inFlightRequests as commitable and create an empty queue 
        // to avoid copy and clearing
        List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests);

        // all in-flight requests are handled by the AsyncSinkCommiter and new 
        // elements cannot be added to the queue during a commit, so it's save to 
        // create a new queue
        inFlightRequests = new ConcurrentLinkedQueue<>();

        return committable;
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = future
            .whenComplete((response, err) -> {
                if (response.failedRecordCount() > 0) {
                    List<PutRecordsResultEntry> records = response.records();

                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            requeueFailedRequest(requestEntries.get(i));
                        }
                    }
                }

                //TODO: handle errors of the entire request...
            });

        // return future to track completion of async request
        return handleResponse;
    }

    ...
}


Rejected Alternatives

The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But this

Code Block
languagejava
titleisAvailable pattern
    /**
     * Signals if enough RequestEntryTs have been buffered according to the user
     * specified buffering hints to make a request against the destination. This
     * functionality will be added to the sink interface by means of an
     * additional FLIP.
     *
     * @return a future that will be completed once a request against the
     * destination can be made
     */
    public CompletableFuture<Void> isAvailable() {
        ...
    }

...