THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Buffer to hold request entries that should be persisted into the
* destination.
* <p>
* A request entry contain all relevant details to make a call to the
* destination. Eg, for Kinesis Data Streams a request entry contains the
* payload and partition key.
* <p>
* It seems more natural to buffer InputT, ie, the events that should be
* persisted, rather than RequestEntryT. However, in practice, the response
* of a failed request call can make it very hard, if not impossible, to
* reconstruct the original event. It is much easier, to just construct a
* new (retry) request entry from the response and add that back to the
* queue for later retry.
*/
private final
Deque<RequestEntryT> bufferedRequests = new ConcurrentLinkedDeque<>();
/**
* Tracks all pending async calls that have been executed since the last
* checkpoint. Calls that already completed (successfully or unsuccessfully)
* are automatically removed from the queue. Any request entry that was not
* successfully persisted need to be handled and retried by the logic in
* {@code submitRequestsToApi}.
* <p>
* There is a limit on the number of concurrent (async) requests that can be
* handled by the client library. This limit must be checked before issunin
* <p>
* To complete a checkpoint, we need to make sure that no requests are in
* flight, as they may fail, which could then lead to data loss.
*/
private
Queue<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>();
/**
* Signals if enough RequestEntryTs have been buffered according to the user
* specified buffering hints to make a request against the destination. This
* functionality will be added to the sink interface by means of an
* additional FLIP.
*
* @return a future that will be completed once there is are record available
* to make a request against the destination
*/
public CompletableFuture<Void> isAvailable() {
...
}
@Override
public void write(InputT element, Context context) throws IOException {
bufferedRequests.offerLast(elementConverter.apply(element, context));
}
/**
* A request or single request entries of a request may fail, eg, because of
* network issues or service side throttling. All request entries that
* failed with transient failures need to be re-queued with this method so
* that aren't lost and can be retried later.
* <p>
* Request entries that are causing the same error in a reproducible manner,
* eg, ill-formed request entries, must not be re-queued but the error needs
* to be handled in the logic of {@code submitRequestEntries}. Otherwise
* these request entries will be retried indefinitely, always causing the
* same error.
*/
protected void requeueFailedRequestEntry(RequestEntryT requestEntry) {
bufferedRequests.offerFirst(requestEntry);
}
/**
* In-flight requests may fail, but they will be retried if the sink is
* still healthy.
* <p>
* To not lose any requests, there cannot be any outstanding in-flight
* requests when a commit is initialized. To this end, all in-flight
* requests need to be completed as part of the pre commit.
*/
@Override
public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {
// reuse current inFlightRequests as commitable and create an empty queue
// to avoid copy and clearing
List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests);
// all in-flight requests are handled by the AsyncSinkCommiter and new
// elements cannot be added to the queue during a commit, so it's save to
// create a new queue
inFlightRequests = new ConcurrentLinkedQueue<>();
return committable;
}
/**
* All in-flight requests have been completed, but there may still be
* request entries in the internal buffer that are yet to be sent to the
* endpoint. These request entries are stored in the snapshot state so that
* they don't get lost in case of a failure/restart of the application.
*/
@Override
public List<Collection<RequestEntryT>> snapshotState() throws IOException {
return Collections.singletonList(bufferedRequests);
}
|
...