You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Draft

Discussion thread: here

JIRAhere 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Remote Log Manager a.k.a. RLM is one of the critical components in Kafka that helps support tiered storage. One of its responsibilities is to periodically upload rolled-over log segments on the local disk to the remote storage. It discovers the log segments eligible to be uploaded and then uploads them. RLM creates one such task (RLMTask) for each topic partition managed by the server and schedules them to run at a fixed frequency (of 30 secs) using a ScheduledThreadPoolExecutor.

When an RLMTask identifies a list of segments to upload, it tries to upload all segments in the same run. As a result, if there are a huge number of eligible segments, the RLM task processes them as quickly as possible. Since there are multiple such tasks running concurrently within the thread-pool executor, it ends up consuming significant CPU, thereby affecting producer latencies. This phenomenon is often noticed when we enable tiered storage for existing topics on a cluster. 

Controlling the rate of segment uploads to remote storage would prevent over-utilization of CPU and cluster degradation. 

Similarly, the RLM also plays a role in serving read requests for log segments residing in the remote storage. When receiving a request for remote read, the RLM submits an async read task to its internal thread pool executor. The async task reads data from the remote storage which is then sent back in the fetch response. 

A large number of read requests requesting data residing on the remote storage could cause degradation of the remote storage. This may happen when the majority of the consumers start reading from the earliest offset of their respective Kafka topics. To prevent such degradation, we should have a mechanism to control the rate at which log segments are read from the remote storage.

Goals

  • The administrator should be able to configure an "upper bound" on the rate at which log segments are uploaded to the remote storage.
  • The administrator should be able to configure an "upper bound" on the rate at which log segments are read from the remote storage.

Public Interfaces

Broker Configuration Options

New broker properties will be added to configure quotas for both reading for remote storage as well as writing to the remote storage. We will be using the existing quota framework to keep track of the current read and write rates. 

Configuration for write quotas:

  • remote.log.manager.write.quota.default
    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated to remote upload of segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.write.quota.window.num

    • Type: Integer
    • Mode: Static 
    • Description: The number of samples to retain in memory for remote upload quotas
    • Default value: 61
  • remote.log.manager.write.quota.window.size.seconds

    • Type: Integer
    • Mode: Static
    • Description: The time span of each sample for remote upload quotas  
    • Default value: 1

Configuration for read quotas:

  • remote.log.manager.read.quota.default

    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated for reading remote log segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.read.quota.window.num

    • Type: Integer
    • Mode: Static
    • Description: The number of samples to retain in memory for remote read quotas
    • Default value: 11
  • remote.log.manager.read.quota.window.size.seconds
    • Type: Integer
    • Mode: Static
    • Description: The time span of each sample for remote read quotas
    • Default value: 1

Proposed Changes

This is an enhancement to the existing mechanism for reading/writing log segments from/to the remote storage. In the absence of the read throttling mechanism, the remote storage can degrade when a large number of fetch requests request data from the remote storage. Also, without a mechanism to throttle writes to remote storage, the CPU on the brokers would become overly busy causing degradation of the Kafka cluster and impacting the producer latencies.

Throttling Data Writes

We add a new component RLM WriteQuotaManager to manage quotas for remote writes. It can be configured with the permitted write quota. It can be then used to record the quota consumed following a data upload and also to check if the quota has already been exceeded.

After identifying the log segments that are eligible for upload for a given topic-partition, the RLM task will start uploading the segments to the remote storage in a loop. We make the following enhancements to the RLM task:

  1. Before uploading the next log segment to upload, it checks if the write quota has already been violated. If the quota has not been violated, it first uploads the log segment to the remote storage and then records the number of bytes copied with the WriteQuotaManager. Thus, the quota manager can now see the updated view of quota utilization.
  2. If the quota is already exhausted, the RLM task aborts uploading any further log segments of the topic partition. It moves on to its remaining responsibilities. The RLM task will attempt to upload the pending log segments on its next run (configured for 30 secs)
  3. As a large number of segments start being uploaded, bytes written to remote storage start increasing eventually hitting the quota. At this point, more segments are not uploaded and the next upload must wait until the write quota consumption has fallen back within the permitted rate.

RLM  WriteQuotaManager

It manages the quota for remote writes. It can be configured with the allowed write byte-rate per RLM Writer Thread. It tracks the byte rate for each of the threads of the ScheduledThreadPoolExecutor that is used to execute RLMTask(s). It uses a rolling window (of size 60 secs) to track bytes uploaded in every sec of the window. It can be used to check if the quota for a given writer thread has been exceeded based on the recorded data of the last 60 seconds.

Throttling Data Reads

We add a new component RLM ReadQuotaManager to manage quotas for remote reads. It can be configured with the permitted read quota. It can be then used to record the quota consumed following a data read and also to check if the quota has already been exceeded.

When a fetch request requires remote data for a topic partition, the ReplicaManager will query the ReadQuotaManager to check whether the quota for reading remote data has already been exhausted. If the quota has not been exhausted, the ReplicaManager will continue to return the RemoteStorageFetchInfo that can be used to read the requested data from the remote storage. Once data from the remote storage is read, RLM records the number of bytes read with the ReadQuotaManager. Hence, the quota manager always has an updated view of quota utilization.

However, if the quota has already been exhausted, the ReplicaManager returns an empty RemoteStorageFetchInfo. As a result, the fetch request can still read data for the other topic partitions, but there will be no data for the topic partition requiring remote data as long as the read quota has been exhausted. Once the read quota consumption falls back within the permitted rate, fetch requests can resume serving remote data.

RLM  WriteQuotaManager

It manages the quota for remote reads. It can be configured with the allowed read byte-rate for the Kafka broker. It uses a rolling window (of size 10 secs) to track bytes read in every sec of the window. It can be used to check if the read quota has been exceeded based on the recorded data of the last 10 seconds.

Test Plan

Unit tests for both read and write quotas

Rejected Alternatives

Throttling Data Writes

  1. We can decrease the poolsize for the ThreadPoolExecutor. Reducing the pool size would prevent too many concurrent uploads from running thus preventing over-utilization of CPU. Although this also means, the expiration of remote segments will not run as frequently. We will have to also tune the thread pool size on the cluster depending on the hardware specifications.
  2. There already exists a framework for defining quotas on the rate of requests/bandwidth for producers and consumers. The framework allows tracking the rate of requests, and bandwidth consumed and if the client is violating the configured quota, the framework computes a throttle time for which the client should not send more requests. 

This framework could be extended to support quota on remote writes. We could specify the maximum rate at which we want to allow the remote writes to happen per broker. We use this rate to define the maximum rate at which each thread in the threadpool is allowed to copy. When the current task being executed by a thread in the threadpool, picks up a segment to upload, it records the bytes to copy and checks if copying the segment will violate the quota defined. It will receive time to throttle as a result. A non-zero throttle time means the thread should not upload the segment for that amount of time. We have the following options on how to use this throttle time:

    • The current thread sleeps for that amount of time before proceeding to copy the segment. This will also mean that the task cannot do other pipelined work such as expiring the eligible remote segments
    • The current thread gives up copying the segment, for now, proceeding with other pipelined work and re-attempt to copy the segment after the throttle time. This will require updating the schedule of the next round. Updating the schedule of the next round will also mean updating the schedule of expiration unless we also separate out the responsibilities into two different tasks.

Throttling Data Reads

  • We can decrease the threadpool size used by the executor. Reducing the pool size would prevent too many concurrent reads  The downside is that we will have to tune the threadpool size depending on the hardware specifications of the cluster.
  • We extend the quota framework to support quota on remote reads. We could specify the maximum rate at which we want to read data from the remote storage. When a worker thread in the reader threadpool picks up a new read task, it checks if the read quota has been exceeded. If the quota has not been exceeded, it proceeds to execute the read task, else the read task must wait. This can be implemented in two ways:
    • Compute the throttle time using the quota framework and sleep for that amount of time before executing the request. This means that the threads of the threadpool will always look busy even if they are just waiting.
    • Instead of using a linked blocking queue in the ThreadPoolExecutor, we use a DelayQueue. The read task is re-queued with the compute throttle time as the delay.
      • DelayQueues are unbounded, so we will have to override it to make it bounded.
      • ThreadPoolExecutor needs a BlockingQueue<Runnable>, hence cant use a DelayQueue with it.
    • Alternatively, we compute the throttle time using the quota framework and use it to throttle the client instead. The throttle time is propagated back in the response payload and we use it to throttle further fetch requests from the client. This approach is rejected because it also throttles fetch requests for topic partitions that do not need remote data.
  • We extend the quota framework to support client-level read quotas. We could define both request and bandwidth quotas per client for remote storage read requests. If the client violates any of the configured quotas, we throttle the next request from the client, by specifying the throttle time in the fetch response (Ref.) and muting the channel.
    • Allows us to throttle only the misbehaving clients and not all clients(including polite clients)
    • This approach is rejected because it also throttles fetch requests for topic partitions that do no need remote data.


  • No labels