Status
Current state: Under Discussion
Discussion thread: TBD
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Async Sink was introduced as part of FLIP-171, The Async Sink writer is the main component responsible for converting sink records to downstream requests, buffering requests and sending records down stream.
The current implementation defines 3 triggers for flushing buffered requests:
- Buffer size reaching
maxBatchSize
- Buffer Size in Bytes reaching
maxBatchSizeInBytes
- First Record Queued stays in buffer for more than
maxTimeInBufferMS
During Implementation of DDB sink, the extraction of record size in bytes was proven to be "expensive or not possible", hence we want to abstract buffer flushing triggers and decouple it from AsyncSinkWriter.
We will allow sink implementers to define their own flushing triggers since it was proven to be sink dependent.
Proposed Changes
I propose the following changes:
- Introducing
AsyncSinkBuffer
as@Internal
class which will hold buffer for writer as introduced below. - Introducing the following PublicEvolving interfaces to serve as the pluggable buffer controls.
- BufferBlockingStrategy
@PublicEvolving public interfeace BufferBlockingStrategy<RequestEntryT> { public boolean shouldBlockBuffer(BufferState<RequestEntryT> bufferState, RequestEntryT newEntry); }
BufferBlockingStrategy@PublicEvolving public interfeace BufferFlushTrigger<RequestEntryT> { public void registerFlushAction(AsyncSinkBufferFlushAction flushAction); public void notifyAddRequest(RequestEntryT requestAdded) throws InterruptedException, IOException; public void notifyRemoveRequest(RequestEntryT requestRemoved); } @Internal @FunctionalInterface public interface AsyncSinkBufferFlushAction { void triggerFlush() throws InterruptedException, IOException; }
- Refactoring
AsyncSinkWriter
methods to delegate buffer responsibilities to new buffer, by the following changes- AsyncSinkWriterChanges to nonBlocking Flush
private void nonBlockingFlush() throws InterruptedException { while (!rateLimitingStrategy.shouldBlock(createRequestInfo())) { flush(); } }
AsyncSinkWriterChanges to Writepublic void write(InputT element, Context context) throws IOException, InterruptedException { RequestEntryT requestEntry = elementConverter.apply(element, context); while (!buffer.canAddRequestEntry(requestEntry)) { flush(); } buffer.add(requestEntry, false); }
AsyncSinkWriterChanges to create batchprivate List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) { List<RequestEntryT> batch = buffer.removeBatch(requestInfo.getBatchSize()); numRecordsOutCounter.inc(batch.size()); return batch; }
- Removing private method
registerCallback
from AsyncsinkWriter
- Constructor will construct Triggers and Blocking strategies and add it to buffer as follows
- AsyncSinkWriter constructor
BufferSizeInBytesFlushTrigger<RequestEntryT> bufferedRequestStateSizeInBytesFlushTrigger = new BufferSizeInBytesFlushTrigger<RequestEntryT>(maxBatchSizeInBytes) { @Override protected int getSizeOfEntry(RequestEntryT addedEntry) { return (int) getSizeInBytes(addedEntry); } }; bufferedRequestStateSizeInBytesFlushTrigger.registerFlushAction( (triggerId -> nonBlockingFlush())); BatchSizeFlushTrigger<RequestEntryT> batchSizeFlushTrigger = new BatchSizeFlushTrigger<>(maxBatchSize); batchSizeFlushTrigger.registerFlushAction((triggerId -> nonBlockingFlush())); TimeBasedBufferFlushTrigger<RequestEntryT> timeBasedBufferFlushTrigger = new TimeBasedBufferFlushTrigger<>( context.getProcessingTimeService(), maxTimeInBufferMS); timeBasedBufferFlushTrigger.registerFlushAction(this::flush); this.bufferedRequestEntries = new AsyncSinkBuffer<>( Collections.singletonList( new BufferSizeBlockingStrategy<>(maxBufferedRequests)), Arrays.asList( bufferedRequestStateSizeInBytesFlushTrigger, batchSizeFlushTrigger, timeBasedBufferFlushTrigger));
AsyncSinkBuffer
Class will manage buffer using assigned control strategies with the proposed template:AsyncSinkWriterBuffer@Internal public class AsyncSinkBuffer<RequestEntryT extends Serializable> { protected final ArrayDeque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>(); protected final List<AsyncSinkBufferBlockingStrategy<RequestEntryT>> bufferBlockingStrategies; protected final List<AsyncSinkBufferFlushTrigger<RequestEntryT>> bufferFlushTriggers; public AsyncSinkBuffer( List<AsyncSinkBufferBlockingStrategy<RequestEntryT>> bufferMonitors, List<AsyncSinkBufferFlushTrigger<RequestEntryT>> bufferFlushTriggers) { this.bufferBlockingStrategies = bufferBlockingStrategies; this.bufferFlushTriggers = bufferFlushTriggers; } public void add(RequestEntryT entry, boolean atHead) throws InterruptedException { if (atHead) { bufferedRequestEntries.addFirst(entry); } else { bufferedRequestEntries.add(entry); } notifyAllFlushTriggersOnAdd(entry); } public int size() { return bufferedRequestEntries.size(); } public boolean canAddRequestEntry(RequestEntryT requestEntry) throws IllegalArgumentException { return this.bufferMonitors.stream() .noneMatch(monitor -> monitor.shouldBlock(getBufferState(), requestEntry)); } public List<RequestEntryT> removeBatch(int batchSize) { List<RequestEntryT> nextBatch = new ArrayList<>(); for (int i = 0; i < batchSize; i++) { RequestEntryT requestEntry = bufferedRequestEntries.remove(); notifyAllFlushTriggersOnRemove(requestEntry); nextBatch.add(requestEntry); } return nextBatch; } public BufferedRequestState<RequestEntryT> getBufferState() { return new BufferedRequestState<>(this.bufferedRequestEntries); } private void notifyAllFlushTriggersOnAdd(RequestEntryT requestEntry) throws InterruptedException { long triggerId = Instant.now().toEpochMilli(); for (AsyncSinkBufferFlushTrigger<RequestEntryT> trigger : this.bufferFlushTriggers) { trigger.notifyAddRequest(requestEntry, triggerId); } } private void notifyAllFlushTriggersOnRemove(RequestEntryT requestEntry) { long triggerId = Instant.now().toEpochMilli(); for (AsyncSinkBufferFlushTrigger<RequestEntryT> trigger : this.bufferFlushTriggers) { trigger.notifyRemoveRequest(requestEntry, triggerId); } }
Proposed changes to the public interfaces are:
- Mark abstract getSizeInBytes in AsyncSinkWriter as @Deprecated
Deprecate current constructor of AsyncSinkWriterConfiguration and add new constructor which drop, maxBatchSizeInBytes , and maxRecordSizeInBytes.
Add new PublicEvolving Interfaces BufferBlockingStrategy and AsyncSinkBufferFlushTrigger with aforementioned template.
Add new Constructor for AsyncSinkWriter with List<BufferBlockingStrategy> and List<AsyncSinkBufferFlushTrigger> as parameter for the implementers to be able to override.
Implementing default BatchSizeFlushTrigger, TimeBasedBufferFlushTrigger and SizeInBytesBufferFlushTrigger which contain current Flush trigger logic and pass them as parameters to Buffer from all default constructors to maintain backward compatibility.
Implement default MaxBufferedRequestsBlockingStrategy and pass it as parameter to all default constructors to maintain backward compatibility.
Compatibility, Deprecation, and Migration Plan
- There will be no impact on existing users of the Async Sink, since the behaviour is unchanged.
- There is no deprecation plan for existing behavior, since we maintain backward compatibility.
- We will maintain changes to
flink-aws-connectors
repo containing KDS, KDF and DDB sinks, We will follow up with a release after Flink release.
Test Plan
Modifying existing AsyncsinkWriter Unit Tests should be enough.
We will also add unit tests for Buffer and all implemented strategies and flush triggers
Rejected Alternatives
N/A