Design goal:

The goal of this design is to get us started with the problem. It's not meant to be comprehensive and is intended to provide a simple solution to a common problem introduced by the producers.

Problem:

When the produced data exceeds a certain threshold, I/O performance in the brokers starts to degrade . The means that the latency of all requests will increase. This will impact real time applications that are sensitive to the latency.

High level design:

One way to address this problem is to set a global bytesIn threshold per broker. If that threshold is exceeded, the broker will start dropping produced data until the observed bytesIn rate falls below the expected threshold. The question remains what data to drop when the threshold is exceeded. In a shared Kafka cluster, it's likely that some topics are more important than others. So, it makes sense to have a per topic bytesIn threshold. When the global bytesIn threshold is exceeded, the broker will start dropped produced data on topics whose observed bytesIn rate exceeds the expected per topic threshold.

Low level design:

1. Introduce a new broker level config total.bytes.in.per.sec.threshold. Each topic can define a topic level config bytes.in.per.sec.threshold. A change in bytes.in.per.sec.threshold will be rejected if the sum of all specified bytes.in.threshold exceeds total.bytes.in.per.sec.threshold. For topics without an explicitly defined bytes.in.per.sec.threshold, its threshold is computed as: (total.bytes.in.per.sec.threshold - sum(explicitly specified bytes.in.per.sec.threshold)) / (# topics without an explicitly defined bytes.in.per.sec.threshold). Such a value is recomputed every time if bytes.in.per.sec.threshold for a topic is changed or a new topic is added to the broker. This value can probably be maintained inside LogManager.

2. We are already monitoring the global and the topic level bytesIn rate using Meter. Meter exposes a OneMinuteRate, which is an EWMA and is refreshed every 5 seconds by default. This should be good enough for measuring the observed bytesIn rate.

3. Introduce TopicQuota.

Object TopicQuota
{
    public bool recordQuota(Meter observedTotalRate, Long expectedTotalRate, Meter observedTopicRate, Long expectedTopicRate, Long incomingDataSize)
    {
      if (observedTotalRate.getOneMinuteRate() > expectedTotalRate && observedTopicRate.getOneMinuteRate() > expectedTopicRate)
        return false

      observedTotalRate.record(incomingDataSize)
      observedTopicRate.record(incomingDataSize)
      return true
    }
}

4. In Log.append(), we do

    if (Topic.Quota.recordQuota(....))
      append to log
    else
      throw QuotaExceededException

Discussions:

1. Currently, messages rejected due to MessageSizeTooLargeException is included in the bytesIn rate. We probably should use bytesIn rate to measure the amount of data actually getting into the log. We can introduce two other meters to measured the error bytesIn rate (e.g. due to MessageSizeTooLargeException) and throttled bytesIn rate (due to quota). We probably don't need topic level rate for the latter two. Instead, we just log in log4j the dropped requests and the associated reason.

  • No labels

4 Comments

    1. >> A change in bytes.in.per.sec.threshold will be rejected if the sum of all specified bytes.in.threshold exceeds total.bytes.in.per.sec.threshold.
      There are a couple of ways to interpret this - Will each broker check its total threshold against the sum of all topics in the cluster? Or does each broker check its total threshold against only topics for which it hosts some partitions ? The former is simpler and makes sense from a validation standpoint.
    2. Add a IO bytes in rate and IO messages in rate. This should be different from all bytes in rate and all messages in rate.
    3. Explicitly add throttled messages rate and throttled byte rate. Also makes sense to add throttled bytes/messages rate per topic.
    4. Like your idea about adding error messages/bytes rate.
    5. The change to the producer that will help here is to not retry blindly on QuotaExceededException. Though some sophistication will be required to know when to remove the backoff and return to the normal production rate.
  1. What should we do when creating new topics with/without specified quota? One way is that if it has specified quota, then we also need to check if it's quota exceeds total - sum_existing_quotas, and then update the quotas for other topics without specified quota number; if it does not have specified quota, then we always accept it while updating other topics without specified quota number. But this sounds not fair for topics without specific quota numbers.

  2. Thanks for writing this up. Let me know if I misunderstood the main issue here, but I'm somewhat wary of going with a per-topic/global byte rate threshold on a topic basis. It seems it is quite easy to unfairly starve clients that send reasonable volumes of data to an otherwise high-volume topic. (I think Guozhang was trying to raise a similar point). Also, it seems like it would end up being a lot of tedious hand-work in deciding what the per-topic byte- rates should be and something that would need to be revisited often. Furthermore, misconfigurations at this level could have a wide impact across all connected clients (~ the starvation problem).

    Can we consider an alternative implementation by doing per-client throttling? Although that won't guarantee satisfying topic-level bytes-in threshold I think it would be a more fair solution for clients and offer reasonably good throttling on a per-topic basis. Maintaining a per-IP incoming byte rate EWMA should not take a large memory footprint - I would expect of the order of thousands of producers (at most) in a large production setting. Although this does not address the second concern I mentioned (i.e., hand work in per-IP overrides - I would strongly prefer a self-tuned system), misconfigurations if any would have a much more limited impact (i.e., just to those IPs).

    I understand that the goal is to effectively avoid the problems of having a sudden flood of data to some topics in order to reduce any impact to real time applications that are sensitive to the latency. However, I think per-client throttling addresses this in a slightly more general and safer manner that at the end of the day will be easier to configure and use.

  3. Thanks for the comments. Whether we have a per topic or a per client quota, it's always hard to figure out the right value for every topic/client. Another problem is that it's possible that many topics/clients have increased traffic. Individually, none of them exceeds the quota. However, the aggregated increased traffic degrades the performance of the broker. These are the main problems that this design is trying to address.

    1. Have a total byetsIn threshold. Throttling happens and only happens when this limit is exceeded.

    2. The admin only needs to specify the quota for the topic that needs to protected. For the remaining topics, they each get the remaining quota.

    3. The question remains what should we do if sum(topic level quota) is larger than the total quota. This is a bit tricky. The per topic level config is really per broker. So, it can happen that sum(topic level quota) is larger than the total quota for one broker, but less in another broker. It would be weird to reject the config in one broker and accept it in another. One way is to always accept the topic level config and allow sum(topic level quota) to be larger than the total quota. The implication is that throttling may not bring the load below the total quota.