Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: DraftUnder Discussion

Discussion thread: here

JIRA:  

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15265
 here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The Remote Log Manager a.k.a. RLM is one of the critical components in Kafka that helps support tiered storage. One of its responsibilities is to periodically upload rolled-over log segments on the local disk to the remote storage. It discovers the log segments eligible to be uploaded and then uploads them. RLM creates one such task (RLMTask) for each topic partition managed by the server broker and schedules them to run at a fixed frequency (of 30 secs) using a ScheduledThreadPoolExecutor.

When an RLMTask identifies a list of segments to upload, it tries to upload all segments in the same run. As a result, if there are a huge number of eligible segments, the RLM task processes them as quickly as possible. Since there are multiple such tasks are running concurrently within the thread-pool executor, it ends they end up consuming significant CPU, thereby affecting producer latencies. This phenomenon is often noticed when we enable tiered storage for existing topics on a cluster. 

...

Similarly, the RLM also plays a role in serving read requests for log segments residing in the remote storage. When receiving a request for remote read, the RLM submits an async read task to its internal thread pool executor. The async task reads data from the remote storage which that is then sent back in the fetch response. 

A large number of read requests requesting data residing on the remote storage could cause degradation of the remote storage. The large number of read requests may also cause over-utilization of CPU on the broker. This may happen when the majority of the consumers start reading from the earliest offset of their respective Kafka topics. To prevent such degradation, we should have a mechanism to control the rate at which log segments are read from the remote storage. 

Goals

  • The administrator should be able to configure an "upper bound" on the rate at which log segments are uploaded to the remote storage.
  • The administrator should be able to configure an "upper bound" on the rate at which log segments are read from the remote storage.

...

  • remote.log.manager.write.quota.default
    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated to remote upload of segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.write.quota.window.num

    • Type: Integer
    • Mode: Static 
    • Description: The number of samples to retain in memory for remote upload quotas
    • Default value: 61
  • remote.log.manager.write.quota.window.size.seconds

    • Type: Integer
    • Mode: Static
    • Description: The time span duraton of each sample for remote upload quotas  
    • Default value: 1

...

  • remote.log.manager.read.quota.default

    • Type: Long
    • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
    • Description: The default bytes per second quota allocated for reading remote log segments
    • Default value: Long.MAX_VALUE (unlimited)
  • remote.log.manager.read.quota.window.num

    • Type: Integer
    • Mode: Static
    • Description: The number of samples to retain in memory for remote read quotas
    • Default value: 11
  • remote.log.manager.read.quota.window.size.seconds
    • Type: Integer
    • Mode: Static
    • Description: The time span duration of each sample for remote read quotas
    • Default value: 1

...

This is an enhancement to the existing mechanism for reading/writing log segments from/to the remote storage. In the absence of the read a throttling mechanism, the remote storage can degrade when a large number of many fetch requests request data from the remote storage or when a large amount of writes happen to the remote storage. Also , without such a mechanism to throttle writes to remote storage, the CPU on the brokers would become overly busy causing degradation of the Kafka cluster and impacting the producer latencies.

Throttling Data Writes

We add a new component, RLM WriteQuotaManager to manage quotas for remote writes. It is similar to other existing QuotaManagers (for eg. ClientQuotaManager). It can be configured with the permitted desired write quota. It keeps track of the current usage and can be then used to record the quota consumed following a data upload and also to check if the quota has already been exceeded.After identifying the log segments that are eligible for upload for a given topic-partition, the RLM task will start uploading the segments to the remote storage in a loop. We make the following enhancements to the RLM task:used to check if the quota is exhausted. We will use it to record the segment size when a segment is uploaded to the remote, so it can track the rate of bytes upload. We can use it to check if the current upload rate is within the quota before uploading new segments to the remote.

RLM  WriteQuotaManager

The WriteQuotaManager allows users to specify the following properties:

  • Write Quota in bytes per second (default: infinite) 
  • Number of samples to retain in memory (default: 61)
  • The timespan of each sample (default: 1 second)

The QuotaManager utilizes a rolling window to track the bytes uploaded every second. There is a time bucket for each sample and it stores the total bytes uploaded in the time bucket.

T1T2...T60T61

61 buckets of size 1 second (default). There are 60 whole buckets and one additional bucket is to track usage for the current window. 


QuotaManager supports the following two operations:

  • Record the uploaded bytes - The bucket corresponding to the current time is updated with the bytes uploaded. The bucket holds the total bytes of data uploaded in the associated time bucket.
  • Check if the write quota has been exceeded - It computes the average bytes uploaded per second across all time buckets and checks if the average is more than the specified quota. For eg. if the timespan of each bucket is 1 second, the average across all buckets gives the average upload rate in bytes/sec which can be compared with the quota specified in bytes/sec.

The WriteQuotaManager is shared by all the threads of the ScheduledThreadPoolExecutor that is used to execute RLMTasks and keeps track of the total segment upload rate on the broker. The RLMTask will do the following to ensure the remote write quota specified for the cluster is respected:

  • In each run, RLMTask identifies all the log segments for the given topic partition that are eligible for upload. The task then attempts to upload the segments to remote storage in a loop.
  • Before uploading the next log segment

...

  • , it checks

...

  • whether the write quota has already been violated. If the quota has not been violated, it first uploads the log segment to the remote storage and then records the number of bytes

...

  • uploaded with the

...

  • WriteQuotamanager. Thus, the quota manager can

...

  • see the updated view of quota utilization.
  • If the quota is already exhausted,

...

  • the RLMTask waits until the write rate falls below the specified quota. Once the write rate falls, the task uploads the segment, records the number of bytes uploaded with the WriteQuotaManager, and moves on to the next segment.
  • This approach may cause starvation for low throughput topics, since the RLM task for high throughput topics may not give up the thread (the task waits tills the write quota falls below the quota). Starvation may not be a problem, because RLM is still running at the maximum capacity to offload segments to remote, thus preventing the local disk from growing. However, if fairness is desirable, the RLM task should exit if runs into 'quota exceeded error' and it has uploaded at least one segment in its run. This will allow other RLM tasks a chance to be executed. They may run into the same error but will run once the quota utilization subsides.

An RLMTask is also responsible for handling expired remote log segments for the associated topic partition. It cleans up those remote log segments that are expired and are no longer required. With the change in the behavior to block if the remote write quota is exhausted, clean-up of expired segments may get affected and may get stalled if the segment upload rate across the cluster is high causing excessive throttling. To solve this problem, we can break down the RLMTask into two smaller tasks - one for segment upload and the other for handling expired segments. The two tasks shall be executed in separate ThreadPoolExecutors. This will remove the impact of the throttling of segment uploads on segment expiration

...

RLM  WriteQuotaManager

It manages the quota for remote writes. It can be configured with the allowed write byte-rate per RLM Writer Thread. It tracks the byte rate for each of the threads of the ScheduledThreadPoolExecutor that is used to execute RLMTask(s). It uses a rolling window (of size 60 secs) to track bytes uploaded in every sec of the window. It can be used to check if the quota for a given writer thread has been exceeded based on the recorded data of the last 60 seconds.

Throttling Data Reads

We shall add a new component RLM ReadQuotaManager to manage quotas for remote reads. It is similar to the WriteQuotaManager discussed in the previous section, but is meant for tracking reads instead of writes. It can be configured with the permitted read quota. It can then be then used to record the quota consumed following a data read and also to check if the quota has already been exceeded.

RLM  ReadQuotaManager

The ReadQuotaManager allows users to specify the following properties:

  • Read Quota in bytes per second (default: infinite) 
  • Number of samples to retain in memory (default: 11)
  • The timespan of each sample (default: 1 second)

When a fetch request requires remote data for a topic partition, the ReplicaManager will query the ReadQuotaManager to check whether the quota for reading remote data has already been exhausted. If the quota has not been exhausted, the ReplicaManager will continue to return the RemoteStorageFetchInfo that can be used to read the requested data from the remote storage. Once data from the remote storage is read, RLM records the number of bytes read with the ReadQuotaManager. Hence, the quota manager always has an updated view of quota utilization.

However, if the quota has already been exhausted, the ReplicaManager returns an empty RemoteStorageFetchInfo. As a result, the fetch request can still read data for the other topic partitions, but there will be no data for the topic partition requiring remote data as long as the read quota has been exhausted. Once the read quota consumption rate falls back within below the permitted ratequota, fetch requests can resume serving remote data.

RLM  WriteQuotaManager

It manages the quota for remote reads. It can be configured with the allowed read byte-rate for the Kafka broker. It uses a rolling window (of size 10 secs) to track bytes read in every sec of the window. It can be used to check if the read quota has been exceeded based on the recorded data of the last 10 secondsIt is worth noting that there is a possibility that a rogue client can try to read large amounts of data from remote storage, exhausting the broker-level remote read quota. Once the quota is exhausted, all remote reads will be throttled, which will even prevent other polite clients from reading remote data. To solve for this, we can set client-level fetch quotas (should be lesser than broker-level remote read quota). This will prevent the rogue client from reading large amounts of remote data, which in turn will prevent the throttling of remote reads for other clients.

Test Plan

Unit tests for both read and write quotas

Rejected Alternatives

Throttling Data Writes

In this section, we discuss some other approaches for throttling remote writes.

  • We can decrease the

...

  • pool size for the ThreadPoolExecutor.

...

  • This would prevent too many concurrent uploads from running thus preventing over-utilization of CPU

...

Pros:

    • Simplicity. No new feature needs to be built. We will only need to adjust the thread pool size which can be done dynamically.

Cons:

    • This approach relies on reducing concurrency to achieve lower upload speed. In practice, we would know what the maximum upload rate our remote storage can support. It is not straightforward to translate this to the concurrency of upload threads and requires hit-and-trial approach.
    • Reducing concurrency can also introduce unfairness while uploading segments. When an RLMTask runs for a topic partition, it uploads all the eligible log segments in the same run preventing uploads for other topic partitions This can cause delays and lag buildup for other topic partitions, particularly when the thread pool size is small. If some topics have a high message-in rate, the corresponding RLMTasks would end up using all threads in the threadpool preventing uploads for smaller topics.
  • We could use the QuotaManager differently. Instead of tracking the global upload rate, we could track the upload rate per thread in the writer threadpool. Each thread records its upload speed with the quota manager and checks for quota violations before uploading the next log segment.

Cons:

    • Each thread only gets a fraction of the global quota. This will lead to under-utilization of the resources.  For eg., if the global quota is 100 MBps and the threadpool size is 10, each thread can never upload at more than 10 MBps. But if some topic partitions have message-in rate higher than 10 MBps, the upload rate can never catch up with the message-in rate even though the overall upload rate is below 100 MBps.

The alternate architectures are rejected for the disadvantages discussed above.

Throttling Data Reads

In this section, we discuss some other approaches for throttling remote reads.

  • We can decrease the threadpool size for the ThreadPoolExecutor. This would prevent too many concurrent reads.Pros:
    • Simplicity. No new feature needs to be built. We will only need to adjust the thread pool size which can be done dynamically.

Cons:

    • It requires a hit-and-trial approach to set the threadpool size appropriately so as not to exceed a certain read rate from the remote storage.
    • The setting is dependent on the broker hardware and needs to be tuned accordingly.
  • We could use the QuotaManager differently. It will be used by the worker threads in the reader threadpool to check

...

This framework could be extended to support quota on remote writes. We could specify the maximum rate at which we want to allow the remote writes to happen per broker. We use this rate to define the maximum rate at which each thread in the threadpool is allowed to copy. When the current task being executed by a thread in the threadpool, picks up a segment to upload, it records the bytes to copy and checks if copying the segment will violate the quota defined. It will receive time to throttle as a result. A non-zero throttle time means the thread should not upload the segment for that amount of time. We have the following options on how to use this throttle time:

    • The current thread sleeps for that amount of time before proceeding to copy the segment. This will also mean that the task cannot do other pipelined work such as expiring the eligible remote segments
    • The current thread gives up copying the segment, for now, proceeding with other pipelined work and re-attempt to copy the segment after the throttle time. This will require updating the schedule of the next round. Updating the schedule of the next round will also mean updating the schedule of expiration unless we also separate out the responsibilities into two different tasks.

Throttling Data Reads

  • We can decrease the threadpool size used by the executor. Reducing the pool size would prevent too many concurrent reads  The downside is that we will have to tune the threadpool size depending on the hardware specifications of the cluster.
  • We extend the quota framework to support quota on remote reads. We could specify the maximum rate at which we want to read data from the remote storage. When a worker thread in the reader threadpool picks up a new read task, it checks if the read quota has been exceeded. If the quota has not been it isn’t exceeded, it proceeds to execute the read task is processed. Otherwise, else the read task must wait. This can be implemented in two ways:
    • Compute Read Task computes the throttle time using the quota framework and sleep sleeps for that amount of time before executing the request. This means that the threads of the threadpool will always look busy even if they are just waiting.
    • Instead of using a linked blocking queue in the ThreadPoolExecutor, we use a DelayQueue. The read task is re-queued with the compute throttle time as the delay.
      • DelayQueues are unbounded, so we will have to override it to make it bounded.
      • ThreadPoolExecutor needs a BlockingQueue<Runnable>, hence cant use a DelayQueue with it.
    • Alternatively, we compute the throttle time using the quota framework and use it
    • When the task wakes up, the read rate would have fallen within the specified quota. 
    • Another approach is that instead of waiting, the RLMTask execution can be deferred and can be scheduled to run with some delay, i.e. throttle time. 

The drawback of the above two approaches is that if the remote read quota has been exhausted, the RLM will keep accepting more read tasks to be executed later. The fetch request corresponding to the read task may have already timed out by the time the read task gets executed. This will lead to a waste of resources for the broker. 

  • We could use the throttle time (computed from the quota framework) to throttle the client instead. The throttle time is can be propagated back in the response payload and we use it to throttle further fetch requests from the client. This approach is rejected because it also throttles fetch requests for topic partitions that do not need remote data.We extend the quota framework to support client-level read quotas. We could define both request and bandwidth quotas per client for remote storage read requests. If the client violates any of the configured quotas, we throttle the next request from the client, by specifying the throttle time in the fetch response (Ref.) and muting the channel.
  • Allows us to throttle only the misbehaving clients and not all clients(including polite clients)
  • This approach is rejected because it also throttles fetch requests for topic partitions that do no need remote dataHowever, this approach prevents the client from reading any partition even though it may not request data for a partition with remote data.

The alternate architectures are rejected for the disadvantages discussed above.