THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> {
@Override
protected CompletableFuture<?>
submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {
// create a batch request
PutRecordsRequest batchRequest = PutRecordsRequest
.builder()
.records(requestEntries)
.streamName(streamName)
.build();
// call api with batch request
CompletableFuture<PutRecordsResponse> future =
client.putRecords(batchRequest);
// re-queue elements of failed requests
CompletableFuture<PutRecordsResponse> handleResponse = future
.whenComplete((response, err) -> {
if (response.failedRecordCount() > 0) {
List<PutRecordsResultEntry> records = response.records();
for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
requeueFailedRequest(requests.get(i));
}
}
}
//TODO: handle errors of the entire request...
});
// return future to track completion of async request
return handleResponse;
}
} |
...