THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
@Override
protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {
// create a batch request
PutRecordsRequest batchRequest = PutRecordsRequest
.builder()
.records(requestEntries)
.streamName(streamName)
.build();
// call api with batch request
CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);
// re-queue elements of failed requests
future.whenComplete((response, err) -> {
if (response.failedRecordCount() > 0) {
LOG.info("Re-queueing {} messages", response.failedRecordCount());
ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
List<PutRecordsResultEntry> records = response.records();
for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
failedRequestEntries.add(requestEntries.get(i));
}
}
requestResult.complete(failedRequestEntries);
} else {
requestResult.complete(Collections.emptyList());
}
//TODO: handle errors of the entire request...
});
}
...
} |
...