Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

...

...

Vote threadhttps://

...

...

9z3nfgv57g4cmwcl90r4h5tct9h2qgvv
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28487

Release1.16

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. AsyncSinkWriter will construct a new request with a batch of messages:
    1. This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold.
    2. AsyncSinkWriter will call RateLimitingStrategy.getMaxBatchSize() to determine the max batch size to include in the request.
  2. AsyncSinkWriter then calls RateLimitingStrategy.shouldBlock() to decide if the newest request should be sent/blocked.
  3. If we decide to send the request, AsyncSinkWriter will call RateLimitingStrategy.startedRequestregisterCompletedRequest() to provide information to the RateLimitingStrategy that request has been sent.
  4. Once request completes, AsyncSinkWriter will call RateLimitingStrategy.completedRequestregisterCompletedRequest() to provide information to the RateLimitingStrategy with information of completed request (e.g. how many failed messages).

...

Code Block
languagejava
titleRateLimitingStrategy
/**
 * RateLimitingStrategy is consulted before sending a request in AsyncSinkWriter to decide if a request should be sent.
 */
public interface RateLimitingStrategy {
    /**
     * TracksRegisters the information of requests being sent (e.g. to track the current inFlightMessages / requests)
     * @param requestInfo    Data class containing information on request being sent
     */
    void startedRequestregisterInFlightRequest(RequestInfo requestInfo);

	 /**
     * Tracks the information of requests completing (e.g. to track the current inFlightMessages / requests).
	 * Any dynamic scaling on failed requests should be done here too.
     * @param requestInfo    Data class containing information on request completed
     */
    void completedRequestregisterCompletedRequest(RequestInfo requestInfo);

 	 /**
     * Decides whether the next request should be blocked.
     * @param requestInfo    Data class containing information on request being sent
     */
    boolean shouldBlock(RequestInfo requestInfo);

	 /**
     * Returns the max batch size to be used by the AsyncSinkWriter.
     */
    int getMaxBatchSize();
}

/**
 * This data class encapsulates the information of starting / completing requests.
 */
public final class RequestInfo {
	public int failedMessages;
	public int batchSize;
	public long requestStartTime;
}

...

We will also migrate the existing Firehose and Kinesis sinks to use the new interfaces interface proposed here. There will be no change in behaviour, since the CongestionControlRateLimitingStrategy configured will be the same as the current strategy.

Example usage of RateLimitingStrategy

This logic will be in the AsyncSinkWriter, but this is an example use case for the interface proposed here

Code Block
languagejava
titleExample usage of RateLimitingStrategy
RequestInfo info = buildRequestInfo(strategy.getMaxBatchSize());

while (strategy.shouldBlock(info)) {
    sleep();
    info = buildRequestInfo(strategy.getMaxBatchSize());
}

strategy.registerInFlightRequest(info);

// Send the batch

strategy.registerCompletedRequest(info);

Compatibility, Deprecation, and Migration Plan

...