THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final Deque<RequestEntryT>BlockingDeque<RequestEntryT> bufferedRequestEntries = new ConcurrentLinkedDeque<>LinkedBlockingDeque<>(...); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that already completed (successfully or unsuccessfully) * are automatically removed from the queue. Any request entry that was not * successfully persisted need to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private Queue<CompletableFuture<BlockingDeque<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>LinkedBlockingDeque<>(...); @Override public void write(InputT element, Context context) throws IOException, InterruptedException { bufferedRequestEntries.putLast(elementConverter.apply(element, context)); } /** * The entire request may fail or single request entries that are part of * the request may not be persisted successfully, eg, because of network * issues or service side throttling. All request entries that failed with * transient failures need to be re-queued with this method so that aren't * lost and can be retried later. * <p> * Request entries that are causing the same error in a reproducible manner, * eg, ill-formed request entries, must not be re-queued but the error needs * to be handled in the logic of {@code submitRequestEntries}. Otherwise * these request entries will be retried indefinitely, always causing the * same error. */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException { bufferedRequestEntries.offerFirst(requestEntry);putFirst(requestEntry); } /** * Persists buffered RequestsEntries into the destination by invoking {@code * submitRequestEntries} with batches according to the user specified * buffering hints. */ public void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the destination ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE); for (int i = 0; i < MAX_BATCH_SIZE; i++) { try { batch.add(bufferedRequestEntries.remove()); } catch (NoSuchElementException e) { // if there are not enough elements, just create a smaller batch break; } } logger.info("submit requests for {} elements", batch.size()); // call the destination specific code that actually persists the request entries CompletableFuture<?> future = submitRequestEntries(batch); // keep track of in flight request inFlightRequests.put(future); // remove the request from the tracking queue once it competed future.whenComplete((response, err) -> { inFlightRequests.remove(future); }); } } /** * In-flight requests may fail, but they will be retried if the sink is * still healthy. * <p> * To not lose any requests, there cannot be any outstanding in-flight * requests when a commit is initialized. To this end, all in-flight * requests need to be completed as part of the pre commit. */ @Override public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException { // reuse current inFlightRequests as commitable and create an empty queue // to avoid copy and clearing List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests); // all in-flight requests are handled by the AsyncSinkCommiter and new // elements cannot be added to the queue during a commit, so it's save to // create a new queue inFlightRequests = new ConcurrentLinkedQueue<>(); return committable; } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...