THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Status
Current state: Under DiscussionAccepted
Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3CC83F4222-4D07-412D-9BD5-DB92D59DDF03%40amazon.de%3E
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final BlockingDeque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>(...); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that already completed (successfully or unsuccessfully) are * are automatically removeddecrementing from the queuecounter. Any request entry that was not * successfully persisted needs to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private AtomicLong numberOfInFlightRequests; @Override public void write(InputT element, Context context) throws IOException, InterruptedException { // blocks if too many events have been buffered bufferedRequestEntries.putLast(elementConverter.apply(element, context)); // blocks if too many async requests are in flight if (/*buffering hints are met*/) { flush(); } } /** * The entire request may fail or single request entries that are part of * the request may not be persisted successfully, eg, because of network * issues or service side throttling. All request entries that failed with * transient failures need to be re-queued with this method so that aren't * lost and can be retried later. * <p> * Request entries that are causing the same error in a reproducible manner, * eg, ill-formed request entries, must not be re-queued but the error needs * to be handled in the logic of {@code submitRequestEntries}. Otherwise * these request entries will be retried indefinitely, always causing the * same error. */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException { bufferedRequestEntries.putFirst(requestEntry); } /** * Persists buffered RequestsEntries into the destination by invoking {@code * submitRequestEntries} with batches according to the user specified * buffering hints. * * The method blocks if too many async requests are in flight. */ private void flush() throws InterruptedException { while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the destination ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE); for (int i = 0; i < MAX_BATCH_SIZE; i++) { try { batch.add(bufferedRequestEntries.remove()); } catch (NoSuchElementException e) { // if there are not enough elements, just create a smaller batch break; } } ResultFuture<?> requestResult = ... if (numberOfInFlightRequests.getAndIncrement() >= MAX_IN_FLIGHT_REQUESTS) { // block and wait until enough in-fligh requests have completed } // call the destination specific code that actually persists the request entries submitRequestEntries(batch, requestResult); } /** * In flight requests will be retried if the sink is still healthy. But if * in-flight requests fail after a checkpoint has been triggered and Flink * needs to recover from the checkpoint, the (failed) in-flight requests are * gone and cannot be retried. Hence, there cannot be any outstanding * in-flight requests when a commit is initialized. * <p> * To this end, all in-flight requests need to be passed to the {@code * AsyncSinkCommiter} in order to be completed as part of the pre commit. */ @Override public List<AtomicLong> prepareCommit(boolean flush) throws IOException { logger.info("Prepare commit. {} requests currently in flight.", numberOfInFlightRequests.get()); // reuse current inFlightRequests as commitable and create empty queue to avoid copy and clearing List<AtomicLong> committable = Collections.singletonList(numberOfInFlightRequests); // all in-flight requests are handled by the AsyncSinkCommiter and new elements cannot be added to the queue during a commit, so it's save to create a counter numberOfInFlightRequests = new AtomicLong(); return committable; } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...