Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/hmh2xoy34jm4r81txm3x1wv27d6vnpkw

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Async Sink was introduced as part of FLIP-171, The Async Sink writer is the main component responsible for converting sink records to downstream requests, buffering requests and sending records down stream.
The current implementation defines 3 triggers for flushing buffered requests:

  • Buffer size reaching maxBatchSize
  • Buffer Size in Bytes reaching maxBatchSizeInBytes
  • First Record Queued stays in buffer for more than maxTimeInBufferMS


During Implementation of DDB sink, the extraction of record size in bytes was proven to be "expensive or not possible", hence we want to abstract buffer flushing triggers and decouple it from AsyncSinkWriter.
We will allow sink implementers to define their own flushing triggers since it was proven to be sink dependent.


Proposed Changes

I propose the following changes:

  • Introducing AsyncSinkBuffer as @Internal class which will hold buffer for writer as introduced below.
  • Introducing the following PublicEvolving interfaces to serve as the pluggable buffer controls.
    • BufferBlockingStrategy
      @PublicEvolving
      public interfeace BufferBlockingStrategy<RequestEntryT>
      {
          public boolean shouldBlockBuffer(BufferState<RequestEntryT> bufferState,
                                           RequestEntryT newEntry);
      }
      BufferBlockingStrategy
      @PublicEvolving
      public interfeace BufferFlushTrigger<RequestEntryT>
      {
          public void registerFlushAction(AsyncSinkBufferFlushAction flushAction);
          
          public void notifyAddRequest(RequestEntryT requestAdded) 
                  throws InterruptedException, IOException;
          
          public void notifyRemoveRequest(RequestEntryT requestRemoved);
      }
      @Internal
      @FunctionalInterface
      public interface AsyncSinkBufferFlushAction {
          void triggerFlush() throws InterruptedException, IOException;
      }   
  • Refactoring AsyncSinkWriter methods to delegate buffer responsibilities to new buffer, by the following changes
    • AsyncSinkWriterChanges to nonBlocking Flush
      private void nonBlockingFlush() throws InterruptedException {
              while (!rateLimitingStrategy.shouldBlock(createRequestInfo())) {
                  flush();
              }
          }
      AsyncSinkWriterChanges to Write
       public void write(InputT element, Context context) throws IOException, InterruptedException {
              RequestEntryT requestEntry = elementConverter.apply(element, context);
              while (!buffer.canAddRequestEntry(requestEntry)) {
                  flush();
              }
      
              buffer.add(requestEntry, false);
          }
      AsyncSinkWriterChanges to create batch
       private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
              List<RequestEntryT> batch = buffer.removeBatch(requestInfo.getBatchSize());
              numRecordsOutCounter.inc(batch.size());
              return batch;
       }


    • Removing private method registerCallback from AsyncsinkWriter
    • Constructor will construct Triggers and Blocking strategies and add it to buffer as follows
    • AsyncSinkWriter constructor
         BufferSizeInBytesFlushTrigger<RequestEntryT> bufferedRequestStateSizeInBytesFlushTrigger =
                      new BufferSizeInBytesFlushTrigger<RequestEntryT>(maxBatchSizeInBytes) {
                          @Override
                          protected int getSizeOfEntry(RequestEntryT addedEntry) {
                              return (int) getSizeInBytes(addedEntry);
                          }
                      };
              bufferedRequestStateSizeInBytesFlushTrigger.registerFlushAction(
                      (triggerId -> nonBlockingFlush()));
              BatchSizeFlushTrigger<RequestEntryT> batchSizeFlushTrigger =
                      new BatchSizeFlushTrigger<>(maxBatchSize);
              batchSizeFlushTrigger.registerFlushAction((triggerId -> nonBlockingFlush()));
              TimeBasedBufferFlushTrigger<RequestEntryT> timeBasedBufferFlushTrigger =
                      new TimeBasedBufferFlushTrigger<>(
                              context.getProcessingTimeService(), maxTimeInBufferMS);
              timeBasedBufferFlushTrigger.registerFlushAction(this::flush);
              this.bufferedRequestEntries =
                      new AsyncSinkBuffer<>(
                              Collections.singletonList(
                                      new BufferSizeBlockingStrategy<>(maxBufferedRequests)),
                              Arrays.asList(
                                      bufferedRequestStateSizeInBytesFlushTrigger,
                                      batchSizeFlushTrigger,
                                      timeBasedBufferFlushTrigger));
  • AsyncSinkBuffer Class will manage buffer using assigned control strategies with the proposed template:
    AsyncSinkWriterBuffer
    @Internal
    public class AsyncSinkBuffer<RequestEntryT extends Serializable> {
        protected final ArrayDeque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
        protected final List<AsyncSinkBufferBlockingStrategy<RequestEntryT>> bufferBlockingStrategies;
        protected final List<AsyncSinkBufferFlushTrigger<RequestEntryT>> bufferFlushTriggers;
     
        public AsyncSinkBuffer(
                List<AsyncSinkBufferBlockingStrategy<RequestEntryT>> bufferMonitors,
                List<AsyncSinkBufferFlushTrigger<RequestEntryT>> bufferFlushTriggers) {
            this.bufferBlockingStrategies = bufferBlockingStrategies;
            this.bufferFlushTriggers = bufferFlushTriggers;
        }
    
        public void add(RequestEntryT entry, boolean atHead) throws InterruptedException {
            if (atHead) {
                bufferedRequestEntries.addFirst(entry);
            } else {
                bufferedRequestEntries.add(entry);
            }
            notifyAllFlushTriggersOnAdd(entry);
        }
    
        public int size() {
            return bufferedRequestEntries.size();
        }
    
        public boolean canAddRequestEntry(RequestEntryT requestEntry) throws IllegalArgumentException {
            return this.bufferMonitors.stream()
                    .noneMatch(monitor -> monitor.shouldBlock(getBufferState(), requestEntry));
        }
    
        public List<RequestEntryT> removeBatch(int batchSize) {
            List<RequestEntryT> nextBatch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                RequestEntryT requestEntry = bufferedRequestEntries.remove();
                notifyAllFlushTriggersOnRemove(requestEntry);
                nextBatch.add(requestEntry);
            }
            return nextBatch;
        }
    
        public BufferedRequestState<RequestEntryT> getBufferState() {
            return new BufferedRequestState<>(this.bufferedRequestEntries);
        }
    
        private void notifyAllFlushTriggersOnAdd(RequestEntryT requestEntry)
                throws InterruptedException {
            long triggerId = Instant.now().toEpochMilli();
            for (AsyncSinkBufferFlushTrigger<RequestEntryT> trigger : this.bufferFlushTriggers) {
                trigger.notifyAddRequest(requestEntry, triggerId);
            }
        }
    
        private void notifyAllFlushTriggersOnRemove(RequestEntryT requestEntry) {
            long triggerId = Instant.now().toEpochMilli();
            for (AsyncSinkBufferFlushTrigger<RequestEntryT> trigger : this.bufferFlushTriggers) {
                trigger.notifyRemoveRequest(requestEntry, triggerId);
            }
        }

Proposed changes to the public interfaces are:

  • Mark abstract getSizeInBytes in AsyncSinkWriter as @Deprecated
  • Deprecate current constructor of AsyncSinkWriterConfiguration and add new constructor which drop, maxBatchSizeInBytes , and maxRecordSizeInBytes.

  • Add new PublicEvolving Interfaces BufferBlockingStrategy and AsyncSinkBufferFlushTrigger with aforementioned template.

  • Add new Constructor for AsyncSinkWriter with List<BufferBlockingStrategy> and List<AsyncSinkBufferFlushTrigger> as parameter for the implementers to be able to override.

  • Implementing default BatchSizeFlushTrigger, TimeBasedBufferFlushTrigger and SizeInBytesBufferFlushTrigger which contain current Flush trigger logic and pass them as parameters to Buffer from all default constructors to maintain backward compatibility.

  • Implement default MaxBufferedRequestsBlockingStrategy and pass it as parameter to all default constructors to maintain backward compatibility.

Compatibility, Deprecation, and Migration Plan

  • There will be no impact on existing users of the Async Sink, since the behaviour is unchanged.
  • There is no deprecation plan for existing behavior, since we maintain backward compatibility.
  • We will maintain changes to flink-aws-connectors repo containing KDS, KDF and DDB sinks, We will follow up with a release after Flink release.

Test Plan

Modifying existing AsyncsinkWriter Unit Tests should be enough.

We will also add unit tests for Buffer and all implemented strategies and flush triggers


Rejected Alternatives

N/A