Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. AsyncSinkWriter will construct a new request with a batch of messages:
    1. This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold.
    2. AsyncSinkWriter will call RateLimitingStrategy.getMaxBatchSize() to determine the max batch size to include in the request.
  2. AsyncSinkWriter then calls RateLimitingStrategy.shouldBlock() to decide if the newest request should be sent/blocked.
  3. If we decide to send the request, AsyncSinkWriter will call RateLimitingStrategy.startedRequestregisterCompletedRequest() to provide information to the RateLimitingStrategy that request has been sent.
  4. Once request completes, AsyncSinkWriter will call RateLimitingStrategy.completedRequestregisterCompletedRequest() to provide information to the RateLimitingStrategy with information of completed request (e.g. how many failed messages).

...

Code Block
languagejava
titleRateLimitingStrategy
/**
 * RateLimitingStrategy is consulted before sending a request in AsyncSinkWriter to decide if a request should be sent.
 */
public interface RateLimitingStrategy {
    /**
     * TracksRegisters the information of requests being sent (e.g. to track the current inFlightMessages / requests)
     * @param requestInfo    Data class containing information on request being sent
     */
    void startedRequestregisterInFlightRequest(RequestInfo requestInfo);

	 /**
     * Tracks the information of requests completing (e.g. to track the current inFlightMessages / requests).
	 * Any dynamic scaling on failed requests should be done here too.
     * @param requestInfo    Data class containing information on request completed
     */
    void completedRequestregisterCompletedRequest(RequestInfo requestInfo);

 	 /**
     * Decides whether the next request should be blocked.
     * @param requestInfo    Data class containing information on request being sent
     */
    boolean shouldBlock(RequestInfo requestInfo);

	 /**
     * Returns the max batch size to be used by the AsyncSinkWriter.
     */
    int getMaxBatchSize();
}

/**
 * This data class encapsulates the information of starting / completing requests.
 */
public final class RequestInfo {
	public int failedMessages;
	public int batchSize;
	public long requestStartTime;
}

...

We will also migrate the existing Firehose and Kinesis sinks to use the new interfaces interface proposed here. There will be no change in behaviour, since the CongestionControlRateLimitingStrategy configured will be the same as the current strategy.

...