You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state["Draft"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The motivations here are similar to KIP-854 Separate configuration for producer ID expiry.  Idempotent producers became the default in Kafka since KIP-679: Producer will enable the strongest delivery guarantee by default unless otherwise specified at the client side as a result of this all producer instances will be assigned a PIDs. The increase of number of PIDs stored in Kafka brokers expose the broker to OOM errors if it has high number of producers, rogue or misconfigured client(s). As a result of this the broker will hit OOM and become offline. The only way to recover is to increase the heap.  

KIP-854 added separated config to expire PID from transaction however the broker still exposed to OOM if it has high number of PID before `producer.id.expiration.ms` is exceeded. And decreasing the value of `producer.id.expiration.ms` will impact all clients which not desired all the time. It would be more beneficial to target only inefficient users and stopping them from crowding the map of PIDs.


This KIP propose to throttle the number PIDs at the leader of the partition by adding a new rating quota that will be applied during handling the PRODUCE request. This way the broker can reject only risky users and protect itself without impacting everyone else.


Proposed Changes

We propose adding the new QuotaManager called ProducerIdQuotaManager on the PRODUCE request level in the Kafka API that limits the number of active PIDs per user (KafkaPrincipal). The number of active PIDs will be defined as a rate within a period of time (similar to ControllerMutation quota).


 ProducerIdQuotaManager will

  • be applied per KafkaPrincipal as it's a smaller subset than ClientId (which is not enforced by the client config) or a combination of KafkaPrincipal and ClientId. 
  • keep a cache of user (KafkaPrincipal) to unique active PIDs to track active PIDs. The cache will be implemented using a simple bloom filter controlled by time to avoid any unwanted growth that might cause OOM.
  • add rating metrics which will increment if caching layer doesn't contain the PID. And user will be throttled once we reach the allowed quota. 

Cache Active PIDs per KafkaPrincipal

The cache will be represented as a map of KafkaPrincipal to timed controlled bloom filter. The lifecycle of a user's PIDs in the bloom filter in the caching layer will be as the following:

  • Step1: Adding the first PID for user will create a bloom filter for this user in the cached map (let call it bloom_filter_1)
    • Any new PIDs will be added to this cache within producer.id.quota.window.size.seconds 
  • Step2: A new bloom filter will be created along side the old one for the user once we exceed producer.id.quota.window.size.seconds (let's call it bloom_filter_2).
    • All new PIDs from this point will be added to the new filter. 
    • Both bloom filters will be used to check if we came across the same PID or not before. 
  • Step3: The old bloom filter (bloom_filter_1) will be disposed once we reach 1.5 x producer.id.quota.window.size.seconds. Leaving only bloom_filter_2
    • From this point the cache will be only using bloom_filter_2 until the next producer.id.quota.window.size.seconds start
  • Step4: Repeat steps 2, and 3 on bloom_filter_2 once we reach next producer.id.quota.window.size.seconds

Users will be entirely removed from the caching layer if it doesn't have any active bloom filters attached to it anymore. 

Public Interfaces

New Broker Configurations

We propose to introduce the following new configuration to the Kafka broker: 

NameTypeDefaultDescription
producer.id.quota.window.num Int11

The number of samples to retain in memory for alter producer id quotas

producer.id.quota.window.size.seconds Int1

The time span of each sample for producer id quotas

producer.id.quota.cache.cleanup.scheduler.interval.ms

Int10

The frequency in ms that the producer id quota manager will check for disposed cached window.

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

NameTypeDefaultDescription
producer_ids_rateDoubleLong.MaxValueThe rate at which produce request are accepted with new producer id. 

The config will be supported for <user>  only as we are trying to avoid the growth of the caching layer and <user> are known number for the operator of the cluster and could be controlled more than the client-id. 

  • Extend QuotaConfigs  to handle the new quota type
public class QuotaConfigs {
    public static final String PRODUCER_ID_RATE_OVERRIDE_CONFIG = "producer_ids_rate";
 	public static final String PRODUCER_ID_RATE_DOC = "A rate representing the upper bound of active producer ids."
	
 	public static ConfigDef buildProducerIdsConfig(ConfigDef configDef ) {
		configDef.define(PRODUCER_ID_RATE_OVERRIDE_CONFIG, ConfigDef.Type.DOUBLE, Integer.MAX_VALUE,
            Integer.valueOf(Integer.MAX_VALUE).doubleValue(), ConfigDef.Importance.MEDIUM, PRODUCER_ID_RATE_DOC);
        return configDef;
	 }
}
  • Extends `DynamicConfig`  and `ClientQuotaControlManager.configKeysForEntityType` to handle the new quota.

New Broker Metrics

The new metrics will be exposed by the broker:

GroupNameTagsDescription
ProducerIdsrateuserThe current rate
ProducerIdstokensuserThe remaining tokens in the bucket. < 0 indicates that throttling is applied. 
ProducerIdsthrottle-timeuserTracking average throttle-time per user. 

Client Errors

The new quota type will use QuotaViolationException similar to ClientQuotaManager 

New TimeControlledBloomFilter

class TimedBloomFilter[T](numberOfItems: Int, falsePositiveRate: Double, disposalSchedulerIntervalMs: Long, quotaWindowSizeSeconds: Long, scheduler: Scheduler) {
  val bloomFilters: ConcurrentHashMap[Long, SimpleBloomFilter[T]] = new ConcurrentHashMap() // This keep a map of create time to bloom filter

 def put(value: T): Unit = {
	// Will choose the right bloom filter to use 
	}
  def mightContain(value: T): Boolean = {
	// Will check all available bloom filters
	}

  scheduler.schedule("dispose-old_bloom-filter", ()=> {
		// dispose the bloom filter that older the 1.5 x quotaWindowSizeSeconds.
	}, 0L, disposalSchedulerIntervalMs)
}

class SimpleBloomFilter[T](numberOfBits: Int, numberOfHashes: Int) {
 val bits = mutable.BitSet.empty

 def put(value: T): Unit {
	// Will use MurmurHash3 to has the value
	}
 def mightContain(value: T): Boolean {
	// will check if any of the available bloom filters contains the value
	}
}

New ProducerIdQuotaManagerCache

class ProducerIdQuotaManager[K, V](disposalSchedulerIntervalMs: Long, cleanupScheduler: Scheduler) {
	protected val concurrentMap: ConcurrentHashMap[K
, TimedBloomFilter[V]] = new ConcurrentHashMap()    

	protect val schedulerIntervalMs = new AtomicLong(disposalSchedulerIntervalMs)

    cleanupScheduler.schedule("cleanup-keys", () => {
			// Cleanup Keys that doesn't have empty TimedBloomFilter
    }, 0L, schedulerIntervalMs)

  	
 	def disposalSchedulerIntervalMs(intervalMs: Long): Unit = {      
		disposalSchedulerIntervalMs(intervalMs)
  	}
	
	def add(key: K, value: V): ControlledCachedMap[K, V] = {
		// Add value to the key bloom filter
	}

    def containsKeyValuePair(key: K, value: V): Boolean = {
		// Check if key, value exist in the cache
	}
}

Tools

kafka-configs.sh will be extended to support the new quota.  A new quota property will be added, which can be applied to <user>:

  • producer_ids_rate: The number of active PIDs per quota window.

For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'producer_ids_rate=50' --entity-name user1 --entity-type users

Default quotas for <user> can be configured by omitting entity name. For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'producer_ids_rate=200' --entity-type users

Known Limitations

  • As we are using BloomFilter we might get false positives.
  • Throttling based on User will punish any client is used by the same user. However, this is similar risk like existing quotas.

Compatibility, Deprecation, and Migration Plan

Compatibility with Old Clients

  • None, since we are using the same throttling from ClientQuota which the client knows how to handle.

Rejected Alternatives

  1. Limit the total active producer ID allocation number: This solution is the simplest however as stated in the motivation the OOM is always caused by rough or misconfigured client this solution will punish good client along side the rough one. 
  2. Having a limit to the number of active producer IDs: The idea here is if we had misconfigured client, we will expire the older entries This solution will risk the idempotency guarantees. Also there are risk that we my end up expiring the PIDs for good clients as the there is no way to link back PID to specific client at this point. 
  3. allow clients to "close" the producer ID usage: This solution is better however it only improve the situation with new clients leaving the broker exposed to OOM because of old producers. We may need to consider improving the Producer Client to include this but not as part of the scope of this KIP.
  4. Throttle INIT_PRODUCER_ID requests: This solution might look simple however throttling the INIT_PRODUCER_ID doesn't grutnee the OOM would happened as
    1. INIT_PRODUCER_ID for idempotent producer request PIDs from random controller every time so if a client got throttled on one controller doesn't guarantee it will not go through on next controller causing OOM at the leader later
    2. The problem happened on the activation of the PID when it produce and not at the initialisation. So it's more sufficient to throttle at the produce time
  5. Throttle PIDs based on IPs: Similar solution#1 we will end up punishing good users specially if the misbehaving producer is deployed on K8S cluster that has other usecase.
  • No labels