THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) { // create a batch request PutRecordsRequest batchRequest = PutRecordsRequest .builder() .records(requestEntries) .streamName(streamName) .build(); // call api with batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); // re-queue elements of failed requests CompletableFuture<PutRecordsResponse> handleResponse = future .whenComplete((response, err) -> { if (response.failedRecordCount() > 0) { List<PutRecordsResultEntry> records = response.records(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { requeueFailedRequest(requestsrequestEntries.get(i)); } } } //TODO: handle errors of the entire request... }); // return future to track completion of async request return handleResponse; } } |
...