Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntry} method.
     * <p>
     * The method returns a future that indicates, once completed, that all
     * request entries that have been passed to the method on invocation have
     * either been successfully persisted in the destination or have been
     * re-queued.
     * <p>
     * During checkpointing, the sink needs to ensure that there are no
     * outstanding in-flight requests. Ie, that all futures returned by this
     * method are completed.
     *
     * @param requestEntries a set of requests that should be sent to the API
     *                       endpoint
     * @return a future that completes when all request entries have been
     * successfully persisted to the API or were re-queued
     */
    protected abstract 
    CompletableFuture<?> submitRequestEntries(List<RequestEntryT> requestEntries);


    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;

    ...
}

...

Code Block
languagejava
titleAmazonKinesisDataStreamSink
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?>
    submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = future
            .whenComplete((response, err) -> {
                if (response.failedRecordCount() > 0) {
                    List<PutRecordsResultEntry> records = response.records();

                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            requeueFailedRequest(requestEntries.get(i));
                        }
                    }
                }

                //TODO: handle errors of the entire request...
            });

        // return future to track completion of async request
        return handleResponse;
    }

    ...
}

...