THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>, Collection<RequestEntryT>> {
/**
* This method specifies how to persist buffered request entries into the
* destination. It is implemented when support for a new destination is
* added.
* <p>
* The method is invoked with a set of request entries according to the
* buffering hints (and the valid limits of the destination). The logic then
* needs to create and execute the request against the destination (ideally
* by batching together multiple request entries to increase efficiency).
* The logic also needs to identify individual request entries that were not
* persisted successfully and resubmit them using the {@code
* requeueFailedRequestEntry} method.
* <p>
* The method returns a future that indicates, once completed, that all
* request entries that have been passed to the method on invocation have
* either been successfully persisted in the destination or have been
* re-queued.
* <p>
* During checkpointing, the sink needs to ensure that there are no
* outstanding in-flight requests. Ie, that all futures returned by this
* method are completed.
*
* @param requestEntries a set of requests that should be sent to the API
* endpoint
* @return a future that completes when all request entries have been
* successfully persisted to the API or were re-queued
*/
protected abstract
CompletableFuture<?> submitRequestEntries(List<RequestEntryT> requestEntries);
/**
* The ElementConverter provides a mapping between for the elements of a
* stream to request entries that can be sent to the destination.
* <p>
* The resulting request entry is buffered by the AsyncSinkWriter and sent
* to the destination when the {@code submitRequestEntries} method is
* invoked.
*/
private final ElementConverter<InputT, RequestEntryT> elementConverter;
...
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> {
@Override
protected CompletableFuture<?>
submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {
// create a batch request
PutRecordsRequest batchRequest = PutRecordsRequest
.builder()
.records(requestEntries)
.streamName(streamName)
.build();
// call api with batch request
CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);
// re-queue elements of failed requests
CompletableFuture<PutRecordsResponse> handleResponse = future
.whenComplete((response, err) -> {
if (response.failedRecordCount() > 0) {
List<PutRecordsResultEntry> records = response.records();
for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
requeueFailedRequest(requestEntries.get(i));
}
}
}
//TODO: handle errors of the entire request...
});
// return future to track completion of async request
return handleResponse;
}
...
} |
...