You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-480: Sticky Partitioner introduced a UniformStickyPartitioner and made it the default paritioner.  It turned out that despite being called UniformStickyPartitioner, the sticky partitioner is not uniform in a problematic way: it actually distributes more records to slower brokers and can cause "runaway" problems, when a temporary slowness of a broker skews distribution such that the broker gets more records and becomes slower because of that, which in turn skews distribution even more, and the problem is perpetuated.

The problem happens because the "stickiness" time is driven by the new batch creation, which is reciprocal to broker latency - slower brokers drain batches slower, so they get more of the "sticky" time than faster brokers, thus skewing the distribution.  The details of the scenario are described well here.

This is not the only problem; even when all brokers are uniformly fast, with linger.ms=0 and many brokers, the sticky partitioner doesn't create batches as efficiently.  Consider this scenario, say we have 30 partitions, each has a leader on its own broker.

  1. a record is produced, partitioner assigns to partition1, batch becomes ready and sent out immediately

  2. a record is produced, partitioner sees that a new batch is created, triggers reassignment, assigns to partition2, batch becomes ready and sent out immediately

  3. a record is produced, partitioner sees that a new batch is created, triggers reassignment, assigns to partition3, batch becomes ready and sent out immediately

and so on.  (The actual assignment is random, but on average we'd rotate over all partitions more or less uniformly.) Then it repeats the whole loop once again (the pattern will be the same because we allow 5 in-flight), and again, while it's doing that, the first on the first broker may complete, in which case, a single record batch may be ready again and so on.  This is probably not that big of a deal when the number of brokers is small (or to be precise, the number of brokers that happen to host partitions from one topic), but it's good to understand the dynamics.

So in some sense, the UniformStickyPartitioner is neither uniform nor sufficiently sticky.

Public Interfaces

A new producer config setting would be added: partitioner.sticky.batch.size.  The default would be 0, in which case the batch.size would be used (we can change the default to batch.max.size once KIP-782 is implemented).

Proposed Changes

To arrive at the idea of a strictly uniform sticky partitioner, we can consider ClassicDefaultPartitioner (it uses round-robin instead of random, but it's not important for this discussion).  ClassicDefaultPartitioner chooses a new partition for each 1 record.  This way, it achieves uniform distribution (at the record level), but batching is not very good for the same reason described earlier in the motivation section (as it goes through partitions, they become ready again and records are sent immediately).  Now, if instead of each 1 record we chose a new partition every N bytes, then the distribution would be as uniform (barring restarts) as in the classic partitioner, but also sticky because more consecutive records are directed to a partition, allowing it to create better batches.

Let's consider how the batching is going to be different with a strictly uniform sticky partitioner and linger.ms=0 and 30 partitions each on its own broker.

  1. a record is produced, partitioner assigns to partition1, batch becomes ready and sent out immediately

  2. a record is produced, partitioner is still stuck to partition1, batch becomes ready and sent out immediately

  3. same thing

  4. --

  5. --

  6. a record is produced, partitioner is still stuck to partition1, now we have 5 in-flight, so batching begins

The batching will continue until either an in-flight batch completes or we hit the N bytes and move to the next partition.  This way it takes just 5 records to get to batching mode, not 5 x number of partition records, and the batching mode will stay longer as we'll be batching while waiting for a request to be completed.  As the production rate accelerates, the logic will automatically switch to use larger batches to sustain higher throughput.

If one of the brokers has higher latency the records for the partitions hosted on that broker are going to form larger batches, but it's still going to be the same amount records sent less frequently in larger batches, the logic automatically adapts to that.

To summarize, the uniform sticky partitioner has the following advantages:

  1. It's uniform, which is simple to implement and easy to understand.  Intuitively, this is what users expect.

  2. It creates better batches, without adding linger latency on low production rate but switching to better batching on high production rate.

  3. It adapts to higher latency brokers, using larger batches to push data, keeping throughput and data distribution uniform.

  4. It's efficient (logic for selecting partitions doesn't require complex calculations).

Compatibility, Deprecation, and Migration Plan

  • No compatibility, deprecation, migration plan.  This fixes a problem with current implementation.
  • Users can continue to use their own partitioners--if they want to implement a partitioner that switches partitions based on batch creations, they can use the onNewBatch(String topic, Cluster cluster) method to implement the feature.

Rejected Alternatives

One potential disadvantage of strictly uniform distribution is that if one of the brokers is lagging behind (cannot sustain its share of throughput), the records will keep piling in the accumulator, and will eventually exhaust the buffer pool memory and slow down the production rate to match the capacity of the slowest broker.  We could abandon strict uniformity and send less data to slow brokers, but this could create potential downstream problems as the data distribution would be skewed at the consumer side as well.


  • No labels