Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The administrator should be able to configure an "upper bound" on the rate at which log segments are uploaded to the remote storage.
  • The administrator should be able to configure an "upper bound" on the rate at which log segments are read from the remote storage.

Public Interfaces

Broker Configuration Options

New broker properties will be added to configure quotas for both reading for remote storage as well as writing to the remote storage. We will be using the existing quota framework to keep track of the current read and write rates. 

Configuration for write quotas:

  • remote.log.manager.write.quota.default
    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated to remote upload of segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.write.quota.window.num

    • Type: Integer
    • Mode: Static 
    • Description: The number of samples to retain in memory for remote upload quotas
    • Default value: 61
  • remote.log.manager.write.quota.window.size.seconds


    • Type: Integer
    • Mode: Static
    • Description: The time span of each sample for remote upload quotas  
    • Default value: 1

Configuration for read quotas:

  • remote.log.manager.read.quota.default

    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated for reading remote log segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.read.quota.window.num


    • Type: Integer
    • Mode: Static
    • Description: The number of samples to retain in memory for remote read quotas
    • Default value: 11
  • remote.log.manager.read.quota.window.size.seconds
    • Type: Integer
    • Mode: Static
    • Description: The time span of each sample for remote read quotas
    • Default value: 1

Proposed Changes

This is an enhancement to the existing mechanism for reading/writing log segments from/to the remote storage. In the absence of the read throttling mechanism, the remote storage can degrade when a large number of fetch requests request data from the remote storage. Also, without a mechanism to throttle writes to remote storage, the CPU on the brokers would become overly busy causing degradation of the Kafka cluster and impacting the producer latencies.

...

It manages the quota for remote reads. It can be configured with the allowed read byte-rate for the Kafka broker. It uses a rolling window (of size 10 secs) to track bytes read in every sec of the window. It can be used to check if the read quota has been exceeded based on the recorded data of the last 10 seconds.

Compatibility, Deprecation, and Migration Plan

...

Test Plan

Unit tests for both read and write quotas

Rejected Alternatives

Throttling Data Writes

  1. We can decrease the poolsize for the ThreadPoolExecutor. Reducing the pool size would prevent too many concurrent uploads from running thus preventing over-utilization of CPU. Although this also means, the expiration of remote segments will not run as frequently. We will have to also tune the thread pool size on the cluster depending on the hardware specifications.
  2. There already exists a framework for defining quotas on the rate of requests/bandwidth for producers and consumers. The framework allows tracking the rate of requests, and bandwidth consumed and if the client is violating the configured quota, the framework computes a throttle time for which the client should not send more requests. 

This framework

...

could be extended to support quota on remote writes. We could specify the maximum rate at which we want to allow the remote writes to happen

...

per broker. We use this rate to define the maximum rate at which each thread in the threadpool is allowed to copy. When the current task being executed by a thread in the threadpool, picks up a segment to upload, it records the bytes to copy and checks if copying the segment will violate the quota defined. It will receive time to throttle as

...

a result. A non-zero throttle time means the thread should not upload the segment for that amount of time. We have the following options on how to use this throttle time:

    • The current thread sleeps for that amount of time before proceeding to copy the segment. This will also mean that the task cannot do other pipelined work such as expiring the eligible remote segments
    • The current thread gives up copying the segment, for now, proceeding with other pipelined work and re-attempt to copy the segment after the throttle time. This will require updating the schedule of the next round. Updating the schedule of the next round will also mean updating the schedule of expiration unless we also separate out the responsibilities into two different tasks.

Throttling Data Reads

  • We can decrease the threadpool size used by the executor. Reducing the pool size would prevent too many concurrent reads  The downside is that we will have to tune the threadpool size depending on the hardware specifications of the cluster.
  • We extend the quota framework to support quota on remote reads. We could specify the maximum rate at which we want to read data from the remote storage. When a worker thread in the reader threadpool picks up a new read task, it checks if the read quota has been exceeded. If the quota has not been exceeded, it proceeds to execute the read task, else the read task must wait. This can be implemented in two ways:
    • Compute the throttle time using the quota framework and sleep for that amount of time before executing the request. This means that the threads of the threadpool will always look busy even if
  • the
    • they are just waiting.
    • Instead of using a linked blocking queue in the ThreadPoolExecutor, we use a DelayQueue. The read task is
  • requeued
    • re-queued with the compute throttle time as the delay.
      • DelayQueues are unbounded, so we will have to override it to make it bounded.
      • ThreadPoolExecutor needs a BlockingQueue<Runnable>, hence cant use a DelayQueue with it.
    • Alternatively, we compute the throttle time using the quota framework and use it to throttle the client instead. The throttle time is propagated back in the response payload and we use it to throttle further fetch requests from the client. This approach is rejected because it also throttles fetch requests for topic partitions that do not need remote data.
  • We extend the quota framework to support client-level read quotas. We could define both request and bandwidth quotas per client for remote storage read requests. If the client violates any of the configured quotas, we throttle the next request from the client, by specifying the throttle time in the fetch response (Ref.) and muting the channel.
    • Allows us to throttle only the misbehaving clients and not all clients(including polite clients)
    • This approach is rejected because it also throttles fetch requests for topic partitions that do no need remote data.