Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://

...

...

...

...

thread/9z3nfgv57g4cmwcl90r4h5tct9h2qgvv
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28487

Release1.16

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Async Sink was introduced as part of FLIP-171, and implements a non-configurable congestion control rate limiting strategy to reduce network congestion when the destination rejects or throttles requests. We want to make this configurable, to allow the sink implementer to decide the desired congestion control behaviour given a specific destination.

Background on Async Sink

The main component of a sink (with at-least-once semantics) is the SinkWriter, which is responsible for accepting input elements from the Flink Datastream in the write() method, and writing to a destination in the flush() method. See FLIP-143 for more details on the Sink API.

The design principle behind the Async Sink is to abstract away common logic used when plugging into this Sink API, to write asynchronously to a destination with at-least-once semantics. This logic includes persisting un-flushed records, handling asynchronous responses, deciding when to flush and dynamically adjusting the request rate to the destination.

The Async Sink consists of an ElementConverter and an AsyncSinkWriter. The ElementConverter handles converting Datastream elements (InputT) into messages specific to the destination (RequestEntryT). The AsyncSinkWriter handles the logic of actually making an asynchronous request to the destination. It receives a batch of messages (List<RequestEntryT>), makes the asynchronous request(s), and invokes a callback with a batch of any failed messages. See FLIP-171 or this blog post for more details on the Async Sink.

...

Rate limiting the Async Sink

In the current implementation, the Async Sink adjusts its behaviour depending on whether it can successfully persist messages (RequestEntryT) to the destination. In particular, it uses a non-configurable rate limiting strategy that reduces the number of in-flight messages (RequestEntryT) when any messages in a single batch fail, and increases the number of in-flight messages when all messages in a single batch succeed. The Async Sink adjusts the number of in-flight messages using the Additive Increase / Multiplicative Decrease (AIMD) algorithm, which is a feedback control method used for TCP congestion control. This provides an indirect control of rate, and the sink eagerly tries to increase its in-flight messages until it hits limits from the destination (e.g. throttled). This works well to reduce the congestion on the network caused by the Flink connector.

We propose to make the following configurable:

  • What to scale: The current strategy only controls the number of in-flight messages. For destinations where the number of in-flight requests (batches of messages) is more important than in-flight messages, the sink implementer should be able to decide which factor to control.
  • When to scale: The sink aggressively scales up/down throughput whenever there are successful/failed messages. The sink implementer should be able to make this decision, e.g. only scale down after a threshold of failed messages, only scale up after X successful requests over Y time period.
  • How much to scale: The extent of scaling up/down is fixed. The sink implementer should be able to decide how aggressively they want to scale up/down in event of failure/success.

Public Interfaces

Proposed changes to the public interfaces are:

  • Introduce

...

  • RateLimitingStrategy interface, which will

...

  • expose methods to collect information about the requests being started and completed, as well as method to decide whether to block the next request.
  • Add a builder to AsyncSinkWriter, which calls a private constructor with option to specify

...

  • RateLimitingStrategy
  • Mark the public constructor of AsyncSinkWriter as @Deprecated

Proposed Changes

Overview

We propose to introduce three new elements - CongestionControlConfig, CongestionControlStrategy and ScalingStrategy. CongestionControlConfig will contain information on a RateLimitingStrategy, which will make the decision of what to scale, CongestionControlStrategy will decide when to scale, and ScalingStrategy will decide how much to scale.

  • What to scale: We will give the sink implementer the option to specify either maxInFlightMessages or maxInFlightRequests as the congestion control target in the CongestionControlConfig. We can add other configurations in the future (e.g. maxInFlightBytes).
  • When to scale: Upon completion of asynchronous requests, the Async Sink will check if any in-flight configurations should be adjusted by checking with the CongestionControlStrategy. This check is done here since this is the point when we have information about whether requests are successful / need re-queuing. The decision of whether to scale up / down / do nothing will be entirely up to the CongestionControlStrategy.
  • How much to scale: This decision will be entirely up to the ScalingStrategy.

List of changes

  • CongestionControlStrategy interface, with CongestionControlInfo data class

The interaction between the RateLimitingStrategy and AsyncSinkWriter when submitting a request will be as follows:

  1. AsyncSinkWriter will construct a new request with a batch of messages:
    1. This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold.
    2. AsyncSinkWriter will call RateLimitingStrategy.getMaxBatchSize() to determine the max batch size to include in the request.
  2. AsyncSinkWriter then calls RateLimitingStrategy.shouldBlock() to decide if the newest request should be sent/blocked.
  3. If we decide to send the request, AsyncSinkWriter will call RateLimitingStrategy.registerCompletedRequest() to provide information to the RateLimitingStrategy that request has been sent.
  4. Once request completes, AsyncSinkWriter will call RateLimitingStrategy.registerCompletedRequest() to provide information to the RateLimitingStrategy with information of completed request (e.g. how many failed messages).

With the below suggestion, all these decisions will now be made internally within the RateLimitingStrategy. The AsyncSinkWriter will only call into the RateLimitingStrategy to check if it should send the next request.

Code Block
languagejava
titleCongestionControlStrategyRateLimitingStrategy
/**
 * This interface decides when the control target RateLimitingStrategy is consulted before sending a request in AsyncSinkWriter to decide if a request should be adjustedsent.
 */
public interface CongestionControlStrategy<T>RateLimitingStrategy {
    /**
     * ReturnsRegisters the new target value
     * @param congestionControlInfo    Data class containing information neededof torequests makebeing a decision assent (e.g. to whethertrack to adjust the rate
current inFlightMessages    *// requests)
    T getNextValue(T currentValue, ScalingStrategy scalingStrategy, CongestionControlInfo congestionControlInfo);
}

/**
 * This data class encapsulates the information passed into the CongestionControlStrategy to
 * decide if the target should be adjusted.
 */
public final class CongestionControlInfo {
   public int numberOfFailedMessages;
   public int batchSize;
}
  • ScalingStrategy interface
Code Block
languagejava
titleScalingStrategy
* @param requestInfo    Data class containing information on request being sent
     */
    void registerInFlightRequest(RequestInfo requestInfo);

	 /**
 * This interface lets the* sinkTracks implementerthe decideinformation howof torequests scale up / down a given value
 */
public interface ScalingStrategy<T> {
   /**
    * Scales up the value
    * @param currentValue     current value to help with scaling            
completing (e.g. to track the current inFlightMessages / requests).
	 * Any dynamic scaling on failed requests should be done here too.
     * @param scalingInforequestInfo       Data Data class containing information neededon torequest decidecompleted
 how much to scale
    */
 
   Tvoid scaleUpregisterCompletedRequest(T currentValue, CongestionControlInfo congestionControlInfoRequestInfo requestInfo);
   
  	 /**
     * ScalesDecides downwhether the value
next request should  * @param currentValue     current value to help with scaling            
    be blocked.
     * @param scalingInforequestInfo       Data Data class containing information neededon torequest decidebeing howsent
 much to scale
    */
   T boolean scaleDownshouldBlock(T currentValue, CongestionControlInfo congestionControlInfo);
}
  • CongestionControlConfig. This is specific to the Async Sink, whereas other interfaces are more general
Code Block
languagejava
titleCongestionControlConfig
RequestInfo requestInfo);

	 /**
 * This config contains information* onReturns whetherthe tomax scalebatch usingsize InFlightRequeststo /be InFlightMessages,
used *by the CongestionControlStrategy, and the ScalingStrategyAsyncSinkWriter.
 */
@Getter
public static class CongestionControlConfig {*/

    private CongestionControlStrategy<Integer> congestionControlStrategy;
    private ScalingStrategy<Integer> scalingStrategy;
    private Target target;

    private CongestionControlConfig(CongestionControlStrategy<Integer> congestionControlStrategy,
                            ScalingStrategy<Integer> scalingStrategy,
                            Target target) {
        this.congestionControlStrategy = congestionControlStrategy;
        this.scalingStrategy = scalingStrategy;
        this.target = target;
    }

    int getMaxBatchSize();
}

/**
 * This data class *encapsulates Usedthe toinformation specifyof whichstarting target/ tocompleting controlrequests.
     */
public final   public enum Targetclass RequestInfo {
	public        IN_FLIGHT_MESSAGES,
        IN_FLIGHT_REQUESTS;
    }
}

...

int failedMessages;
	public int batchSize;
	public long requestStartTime;
}


To make it easier for sink implementers, we will also provide

...

an out-of-the box

...

implementation of CongestionControlRateLimitingStrategy, which uses the current AIMDStrategy to control in-flight messages. This mimics the existing behaviour of the

...

AsyncSink.

Migration of existing Async Sinks

We will also migrate the existing Firehose and Kinesis sinks to use the new interface proposed here. There will be no change in behaviour, since the CongestionControlRateLimitingStrategy configured will be the same as the current strategy.

Example usage of RateLimitingStrategy

This logic will be in the AsyncSinkWriter, but this is an example use case for the interface proposed here

Example configuring the Congestion Control Strategy using the new interfaces

With the changes listed above, a sink implementer can simply configure the CongestionControlConfig by doing the following

Code Block
languagejava
titleCongestionControlStrategyExample usage of RateLimitingStrategy
RequestInfo infoCongestionControlConfig congestionControlConfig = new CongestionControlConfig.builder()
       .withCongestionControlStrategy(ThresholdCongestionControlStrategy.builder()buildRequestInfo(strategy.getMaxBatchSize());

while (strategy.shouldBlock(info)) {
            .withScaleDownFailureThreshold(0.5)sleep();
    info        .build= buildRequestInfo(strategy.getMaxBatchSize())
        .withScalingStrategy(AIMDScalingStrategy.builder()
            .withIncreaseRate(10)
            .withDecreaseFactor(0.5)
            .withThreshold(100)
            .build())
        .forTarget(CongestionControlConfig.Target.IN_FLIGHT_MESSAGES)
        .build();

Migration of existing Async Sinks

...

;
}

strategy.registerInFlightRequest(info);

// Send the batch

strategy.registerCompletedRequest(info);

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users of the Async Sink, since the behaviour is unchanged. The @Deprecated constructor will be configured to have the exact same CongestionControlStrategy RateLimitingStrategy as it has currently.

There is no deprecation plan for existing behaviour, since we are not changing the behaviour, only making it configurable.

Test Plan

  • AsyncSinkWriter
    • We will introduce tests to verify that the CongestionControlStrategy and ScalingStrategy specified in the CongestionControlConfig RateLimitingStrategy specified is respected.
    • Tests will cover both in-flight messages and in-flight requests
  • For ThresholdCongestionControlStrategy and AIMDScalingStrategy, both follow CongestionControlRateLimitingStrategy follows a strict mathematical function, so we will test that the output follows that function.

Rejected Alternatives

Rejected the alternative of creating a more general RateLimitStrategy RateLimitingStrategy that can control number of requests per unit time (e.g. 500 requests/minute). That would require us to expose controls to the RateLimitStrategy RateLimitingStrategy to trigger a flush() on the AsyncSinkWriter, which increases complexity of the Async Sink implementation. Since congestion control works well at the moment, we decided against this.