Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Async Sink was introduced as part of FLIP-171, and implements a non-configurable congestion control strategy to reduce network congestion when the destination rejects or throttles requests. We want to make this configurable, to allow the sink implementer to decide the desired congestion control behaviour given a specific destination.

Background on Async Sink

The main component of a sink (with at-least-once semantics) is the SinkWriter, which is responsible for accepting input elements from the Flink Datastream in the write() method, and writing to a destination in the flush() method. See FLIP-143 for more details on the Sink API.

The design principle behind the Async Sink is to abstract away common logic used when plugging into this Sink API, to write asynchronously to a destination with at-least-once semantics. This logic includes persisting un-flushed records, handling asynchronous responses, deciding when to flush and dynamically adjusting the request rate to the destination.

The Async Sink consists of an ElementConverter and an AsyncSinkWriter. The ElementConverter handles converting Datastream elements (InputT) into messages specific to the destination (RequestEntryT). The AsyncSinkWriter handles the logic of actually making an asynchronous request to the destination. It receives a batch of messages (List<RequestEntryT>), makes the asynchronous request(s), and invokes a callback with a batch of any failed messages. See FLIP-171 or this blog post for more details on the Async Sink.

Congestion control in the Async Sink

In the current implementation, the Async Sink adjusts its behaviour depending on whether it can successfully persist messages (RequestEntryT) to the destination. In particular, it uses a non-configurable rate limiting strategy that reduces the number of in-flight messages (RequestEntryT) when any messages in a single batch fail, and increases the number of in-flight messages when all messages in a single batch succeed. The Async Sink adjusts the number of in-flight messages using the Additive Increase / Multiplicative Decrease (AIMD) algorithm, which is a feedback control method used for TCP congestion control. This provides an indirect control of rate, and the sink eagerly tries to increase its in-flight messages until it hits limits from the destination (e.g. throttled). This works well to reduce the congestion on the network caused by the Flink connector.

We propose to make the following configurable:

  • What to scale: The current strategy only controls the number of in-flight messages. For destinations where the number of in-flight requests (batches of messages) is more important than in-flight messages, the sink implementer should be able to decide which factor to control.
  • When to scale: The sink aggressively scales up/down throughput whenever there are successful/failed messages. The sink implementer should be able to make this decision, e.g. only scale down after a threshold of failed messages, only scale up after X successful requests over Y time period.
  • How much to scale: The extent of scaling up/down is fixed. The sink implementer should be able to decide how aggressively they want to scale up/down in event of failure/success.

Public Interfaces

Proposed changes to the public interfaces are:

  • Introduce CongestionControlStrategy interface, which will decide when to scale
  • Introduce ScalingStrategy interface, which will decide how much to scale
  • Introduce CongestionControlConfig class with builder, to contain specified CongestionControlStrategy, ScalingStrategy as well as what to scale.
  • Add a builder to AsyncSinkWriter, which calls a private constructor with option to specify CongestionControlConfig
  • Mark the public constructor of AsyncSinkWriter as @Deprecated

Proposed Changes

Overview

We propose to introduce three new elements - CongestionControlConfig, CongestionControlStrategy and ScalingStrategy. CongestionControlConfig will contain information on what to scale, CongestionControlStrategy will decide when to scale, and ScalingStrategy will decide how much to scale.

  • What to scale: We will give the sink implementer the option to specify either maxInFlightMessages or maxInFlightRequests as the congestion control target in the CongestionControlConfig. We can add other configurations in the future (e.g. maxInFlightBytes).
  • When to scale: Upon completion of asynchronous requests, the Async Sink will check if any in-flight configurations should be adjusted by checking with the CongestionControlStrategy. This check is done here since this is the point when we have information about whether requests are successful / need re-queuing. The decision of whether to scale up / down / do nothing will be entirely up to the CongestionControlStrategy.
  • How much to scale: This decision will be entirely up to the ScalingStrategy.

List of changes

  • CongestionControlStrategy interface, with CongestionControlInfo data class

...

  • Add a builder to AsyncSinkWriter, which calls a private constructor with option to specify CongestionControlConfig
  • To make it easier for sink implementers, we will also provide two out-of-the box implementations of ThresholdCongestionControlStrategy and AIMDScalingStrategy. A combination of these two mimic the existing behaviour of the Async Sink.

Example configuring the Congestion Control Strategy using the new interfaces

With the changes listed above, a sink implementer can simply configure the CongestionControlConfig by doing the following

Code Block
languagejava
titleCongestionControlStrategy
CongestionControlConfig congestionControlConfig = new CongestionControlConfig.builder()
       .withCongestionControlStrategy(ThresholdCongestionControlStrategy.builder()
            .withScaleDownFailureThreshold(0.5)
            .build())
        .withScalingStrategy(AIMDScalingStrategy.builder()
            .withIncreaseRate(10)
            .withDecreaseFactor(0.5)
            .withThreshold(100)
            .build())
        .forTarget(CongestionControlConfig.Target.IN_FLIGHT_MESSAGES)
        .build();

Migration of existing Async Sinks

We will also migrate the existing Firehose and Kinesis sinks to use the new interfaces proposed here. There will be no change in behaviour, since the CongestionControlStrategy configured will be the same as the current strategy.

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users of the Async Sink, since the behaviour is unchanged. The @Deprecated constructor will be configured to have the exact same CongestionControlStrategy as it has currently.

There is no deprecation plan for existing behaviour, since we are not changing the behaviour, only making it configurable.

Test Plan

  • AsyncSinkWriter
    • We will introduce tests to verify that the CongestionControlStrategy and ScalingStrategy specified in the CongestionControlConfig is respected.
    • Tests will cover both in-flight messages and in-flight requests
  • For ThresholdCongestionControlStrategy and AIMDScalingStrategy, both follow a strict mathematical function, so we will test that the output follows that function.

Rejected Alternatives

Rejected the alternative of creating a more general RateLimitStrategy that can control number of requests per unit time (e.g. 500 requests/minute). That would require us to expose controls to the RateLimitStrategy to trigger a flush() on the AsyncSinkWriter, which increases complexity of the Async Sink implementation. Since congestion control works well at the moment, we decided against this.