THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<?> requestResult) { // create a batch request PutRecordsRequest batchRequest = PutRecordsRequest .builder() .records(requestEntries) .streamName(streamName) .build(); // call api with batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); // re-queue elements of failed requests future.whenComplete((response, err) -> { if (response.failedRecordCount() > 0) { List<PutRecordsResultEntry> records = response.records(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { requeueFailedRequest(requestEntries.get(i)); } } } requestResult.complete(Collections.emptyList()); //TODO: handle errors of the entire request... }); } ... } |
...