Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

...

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final Deque<RequestEntryT> bufferedRequestEntries = new ConcurrentLinkedDeque<>(...);


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully)
     * are automatically removed from the queue. Any request entry that was not
     * successfully persisted need to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private Queue<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>(...);




    /**@Override
    public *void Signals if enough RequestEntryTs have been buffered according to the user
write(InputT element, Context context) throws IOException, InterruptedException {        *
 specified buffering hints to make a request against the destination. This
  bufferedRequestEntries.putLast(elementConverter.apply(element, context));

		
    }


    /**
 functionality will be added to* theThe sinkentire interfacerequest bymay meansfail ofor an
single request entries that are * additional FLIP.part of
     *
 the request may not *be @returnpersisted asuccessfully, futureeg, thatbecause willof benetwork
 completed once a  request* againstissues the
or service side throttling. All *request destinationentries canthat befailed madewith
     */
 transient failures need publicto CompletableFuture<Void> isAvailable() {
        ...
    }


    @Overridebe re-queued with this method so that aren't
     * lost and can be retried later.
    public void write(InputT element, Context context) throws IOException {* <p>
     * Request  bufferedRequestEntries.offerLast(elementConverter.apply(element, context));
    }


    /**entries that are causing the same error in a reproducible manner,
     * The entireeg, ill-formed request mayentries, failmust ornot singlebe requestre-queued entriesbut thatthe areerror part ofneeds
     * theto requestbe mayhandled notin bethe persistedlogic successfully, eg, because of networkof {@code submitRequestEntries}. Otherwise
     * issuesthese orrequest serviceentries sidewill throttling.be Allretried requestindefinitely, entriesalways thatcausing failed withthe
     * transientsame failureserror.
 need to be re-queued with this method so that aren't
     * lost and can be retried later. */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) {
        bufferedRequestEntries.offerFirst(requestEntry);
    }


    /** <p>
     * RequestIn-flight entriesrequests thatmay arefail, causingbut thethey samewill be errorretried inif athe reproduciblesink manner,is
     * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handled in the logic of {@code submitRequestEntries}. Otherwise
     * these request entries will be retried indefinitely, always causing the
     * same error still healthy.
     * <p>
     * To not lose any requests, there cannot be any outstanding in-flight
     * requests when a commit is initialized. To this end, all in-flight
     * requests need to be completed as part of the pre commit.
     */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) {@Override
    public List<Collection<CompletableFuture<?>>>   bufferedRequestEntries.offerFirst(requestEntry);
    }


prepareCommit(boolean flush) throws IOException {

    /**
    // *reuse In-flightcurrent requestsinFlightRequests mayas fail,commitable butand theycreate willan beempty retriedqueue if
 the sink is
     *// stillto healthy.
avoid copy and clearing
  * <p>
     * To not lose any requests, there cannot be any outstandingList<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests);

        // all in-flight
 requests are handled by *the requestsAsyncSinkCommiter whenand anew commit
 is initialized. To this end, all in-flight
 // elements cannot be *added requeststo needthe toqueue beduring completeda ascommit, partso ofit's thesave preto commit.
     *   //
 create a new @Overridequeue
     public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush)inFlightRequests throws= IOException {new ConcurrentLinkedQueue<>();

        // reuse current inFlightRequests as commitable and create an empty queue 
        // to avoid copy and clearingreturn committable;
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries List<Collection<CompletableFuture<?>>>in committablethe = Collections.singletonList(inFlightRequests);

        // all in-flight requests are handled by the AsyncSinkCommiter and new 
        // elements cannot be added to the queue during a commit, so it's save to internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
   // create@Override
 a new queue
 public List<Collection<RequestEntryT>> snapshotState() throws    inFlightRequests = new ConcurrentLinkedQueue<>();

IOException {
        return committableCollections.singletonList(bufferedRequestEntries);
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */

Limitations

  • The sink is designed for destinations that provide an async client. Destinations that cannot ingest events in an async fashion cannot be supported by the sink.
  • The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

  • We are not focusing on support for exactly-once semantics beyond simple upsert capable and idempotent destinations at this point.

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public List<Collection<RequestEntryT>>PutRecordsRequestEntry snapshotState() throws IOExceptionapply(InputT element, SinkWriter.Context context) {
        return Collections.singletonList(bufferedRequestEntries);PutRecordsRequestEntry
    }

Limitations

...

The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

            .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))
                .partitionKey(String.valueOf(element.hashCode()))
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter

...

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data StreamsAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries
new ElementConverter<InputT, PutRecordsRequestEntry>() {

    @Override
    public// PutRecordsRequestEntrycreate apply(InputT element, SinkWriter.Context context) {
a batch request
        PutRecordsRequest batchRequest return= PutRecordsRequestEntryPutRecordsRequest
                .builder()
                .data(SdkBytes.fromUtf8String(element.toString())records(requestEntries)
                .partitionKey(String.valueOf(element.hashCode()))
  streamName(streamName)
              .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {.build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {

// re-queue elements of failed requests
         // create a batch request
CompletableFuture<PutRecordsResponse> handleResponse = future
            PutRecordsRequest batchRequest = PutRecordsRequest.whenComplete((response, err) -> {
                if (response.builderfailedRecordCount()
 > 0) {
             .records(requestEntries)
       List<PutRecordsResultEntry> records = response.records();

      .streamName(streamName)
              for  .build();

        // call api with batch request
(int i = 0; i < records.size(); i++) {
            CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requestsif (records.get(i).errorCode() != null) {
        CompletableFuture<PutRecordsResponse> handleResponse = future
            .whenComplete((response, err) -> {
  requeueFailedRequest(requestEntries.get(i));
              if (response.failedRecordCount() > 0) {
      }
              List<PutRecordsResultEntry> records = response.records();

   }
                }

 for (int i = 0; i < records.size(); i++) {
         //TODO: handle errors of the entire request...
         if (records.get(i).errorCode() != null}) {;

        // return future to track completion of async request
        return handleResponse;
   requeueFailedRequest(requestEntries.get(i)); }

    ...
}


Rejected Alternatives


Code Block
languagejava
titleisAvailable pattern

    /**
     * Signals if enough RequestEntryTs have been buffered according to the }user
     * specified buffering hints to make a request against the destination. This
    }
 * functionality will be added to the sink interface by means of    }
an
     *           //TODO: handle errors of the entire request...additional FLIP.
     *
     * @return a future that will be });

completed once a request against the
    // return* futuredestination tocan trackbe completionmade
 of async request
  */
    public CompletableFuture<Void> return handleResponse;isAvailable() {
    }

    ...
    }