/**
* The ElementConverter provides a mapping between for the elements of a
* stream to request entries that can be sent to the destination.
* <p>
* The resulting request entry is buffered by the AsyncSinkWriter and sent
* to the destination when the {@code submitRequestEntries} method is
* invoked.
*/
private final ElementConverter<InputT, RequestEntryT> elementConverter;
/**
* Buffer to hold request entries that should be persisted into the
* destination.
* <p>
* A request entry contain all relevant details to make a call to the
* destination. Eg, for Kinesis Data Streams a request entry contains the
* payload and partition key.
* <p>
* It seems more natural to buffer InputT, ie, the events that should be
* persisted, rather than RequestEntryT. However, in practice, the response
* of a failed request call can make it very hard, if not impossible, to
* reconstruct the original event. It is much easier, to just construct a
* new (retry) request entry from the response and add that back to the
* queue for later retry.
*/
private final BlockingDeque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>(...);
/**
* Tracks all pending async calls that have been executed since the last
* checkpoint. Calls that already completed (successfully or unsuccessfully)
* are automatically removed from the queue. Any request entry that was not
* successfully persisted need to be handled and retried by the logic in
* {@code submitRequestsToApi}.
* <p>
* There is a limit on the number of concurrent (async) requests that can be
* handled by the client library. This limit is enforced by checking the
* size of this queue before issuing new requests.
* <p>
* To complete a checkpoint, we need to make sure that no requests are in
* flight, as they may fail, which could then lead to data loss.
*/
private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...);
@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
bufferedRequestEntries.putLast(elementConverter.apply(element, context));
// for now, flush will check if enough events have been buffered
flush();
}
/**
* The entire request may fail or single request entries that are part of
* the request may not be persisted successfully, eg, because of network
* issues or service side throttling. All request entries that failed with
* transient failures need to be re-queued with this method so that aren't
* lost and can be retried later.
* <p>
* Request entries that are causing the same error in a reproducible manner,
* eg, ill-formed request entries, must not be re-queued but the error needs
* to be handled in the logic of {@code submitRequestEntries}. Otherwise
* these request entries will be retried indefinitely, always causing the
* same error.
*/
protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {
bufferedRequestEntries.putFirst(requestEntry);
}
/**
* Persists buffered RequestsEntries into the destination by invoking {@code
* submitRequestEntries} with batches according to the user specified
* buffering hints.
*/
public void flush() throws InterruptedException {
while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
// create a batch of request entries that should be persisted in the destination
ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
try {
batch.add(bufferedRequestEntries.remove());
} catch (NoSuchElementException e) {
// if there are not enough elements, just create a smaller batch
break;
}
}
logger.info("submit requests for {} elements", batch.size());
// call the destination specific code that actually persists the request entries
CompletableFuture<?> future = submitRequestEntries(batch);
// keep track of in flight request
inFlightRequests.put(future);
// remove the request from the tracking queue once it competed
future.whenComplete((response, err) -> {
inFlightRequests.remove(future);
});
}
}
/**
* In-flight requests may fail, but they will be retried if the sink is
* still healthy.
* <p>
* To not lose any requests, there cannot be any outstanding in-flight
* requests when a commit is initialized. To this end, all in-flight
* requests need to be completed as part of the pre commit.
*/
@Override
public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {
// reuse current inFlightRequests as commitable and create an empty queue
// to avoid copy and clearing
List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests);
// all in-flight requests are handled by the AsyncSinkCommiter and new
// elements cannot be added to the queue during a commit, so it's save to
// create a new queue
inFlightRequests = new ConcurrentLinkedQueue<>();
return committable;
}
/**
* All in-flight requests have been completed, but there may still be
* request entries in the internal buffer that are yet to be sent to the
* endpoint. These request entries are stored in the snapshot state so that
* they don't get lost in case of a failure/restart of the application.
*/
@Override
public List<Collection<RequestEntryT>> snapshotState() throws IOException {
return Collections.singletonList(bufferedRequestEntries);
}
|