Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        future.whenComplete((response, err) -> {
                if (response.failedRecordCount() > 0) {
                    LOG.info("Re-queueing {} messages", response.failedRecordCount());
    
                    ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
                    List<PutRecordsResultEntry> records = response.records();
    
                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            failedRequestEntries.add(requestEntries.get(i));
                        }
                    }
    
                    requestResult.complete(failedRequestEntries);
                } else {
                    requestResult.complete(Collections.emptyList());
                }

                //TODO: handle errors of the entire request...
            });
    }

    ...
}

...