THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final Deque<RequestEntryT> bufferedRequests = new ConcurrentLinkedDeque<>(); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that already completed (successfully or unsuccessfully) * are automatically removed from the queue. Any request entry that was not * successfully persisted need to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit must be checked before issunin * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private Queue<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>(); /** * Signals if enough RequestEntryTs have been buffered according to the user * specified buffering hints to make a request against the destination. This * functionality will be added to the sink interface by means of an * additional FLIP. * * @return a future that will be completed once there is are record available * to make a request against the destination */ public CompletableFuture<Void> isAvailable() { ... } @Override public void write(InputT element, Context context) throws IOException { bufferedRequests.offerLast(elementConverter.apply(element, context)); } /** * AThe entire request may fail or single request entries that are part of a * the request may fail not be persisted successfully, eg, because of network * network issues or service side throttling. All request entries that failed with * failed with transient failures need to be re-queued with this method so that aren't * that aren't lost and can be retried later. * <p> * Request entries that are causing the same error in a reproducible manner, * eg, ill-formed request entries, must not be re-queued but the error needs * to be handled in the logic of {@code submitRequestEntries}. Otherwise * these request entries will be retried indefinitely, always causing the * same error. */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) { bufferedRequests.offerFirst(requestEntry); } /** * In-flight requests may fail, but they will be retried if the sink is * still healthy. * <p> * To not lose any requests, there cannot be any outstanding in-flight * requests when a commit is initialized. To this end, all in-flight * requests need to be completed as part of the pre commit. */ @Override public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException { // reuse current inFlightRequests as commitable and create an empty queue // to avoid copy and clearing List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests); // all in-flight requests are handled by the AsyncSinkCommiter and new // elements cannot be added to the queue during a commit, so it's save to // create a new queue inFlightRequests = new ConcurrentLinkedQueue<>(); return committable; } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequests); } |
...