Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • keep a cache of user (KafkaPrincipal) to unique active PIDs to track active PIDs. The cache will be implemented using a simple bloom filter controlled by time to avoid any unwanted growth that might cause OOM.
  • add rating metrics which will increment if caching layer doesn't contain the PID. And user will be throttled once we reach the allowed quota. 

Cache Active PIDs per KafkaPrincipal

The cache will be represented as a map of KafkaPrincipal to timed controlled bloom filter. The lifecycle of a user's PIDs in the bloom filter in the caching layer will be as the following:

...

We propose to introduce the following new configuration to the Kafka broker: 

NameTypeDefaultDescription
producer.id.quota.window.num Int11

The number of samples to retain in memory for alter producer id quotas

producer.id.quota.window.size.seconds Int1

The time span of each sample for producer id quotas

producer.id.quota.cache.cleanup.scheduler.interval.ms

Int10

The frequency in ms that the producer id quota manager will check for disposed cached window.

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

NameTypeDefaultDescription
producer_ids_rateDoubleLong.MaxValueThe rate at which produce request are accepted with new producer id. 

The config will be supported for <user>  only as we are trying to avoid the growth of the caching layer and <user> are known number for the operator of the cluster and could be controlled more than the client-id. 

...

The new metrics will be exposed by the broker:

GroupNameTagsDescription
ProducerIdsrateuserThe current rate
ProducerIdstokensuserThe remaining tokens in the bucket. < 0 indicates that throttling is applied. 
ProducerIdsthrottle-timeuserTracking average throttle-time per user. 


Client Errors

The new quota type will use QuotaViolationException similar to ClientQuotaManager 

...

Code Block
languagescala
class ProducerIdQuotaManager[K, V](disposalSchedulerIntervalMs: Long, cleanupScheduler: Scheduler) {
	protected val concurrentMap: ConcurrentHashMap[K
, TimedBloomFilter[V]] = new ConcurrentHashMap()    

	protect val schedulerIntervalMs = new AtomicLong(disposalSchedulerIntervalMs)

    cleanupScheduler.schedule("cleanup-keys", () => {
			// Cleanup Keys that doesn't have empty TimedBloomFilter
    }, 0L, schedulerIntervalMs)

  	
 	def disposalSchedulerIntervalMs(intervalMs: Long): Unit = {      
		disposalSchedulerIntervalMs(intervalMs)
  	}
	
	def add(key: K, value: V): ControlledCachedMap[K, V] = {
		// Add value to the key bloom filter
	}

    def containsKeyValuePair(key: K, value: V): Boolean = {
		// Check if key, value exist in the cache
	}
}

Tools

kafka-configs.sh will be extended to support the new quota.  A new quota property will be added, which can be applied to <user>:

  • producer_ids_rate: The number of active PIDs per quota window.

For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'producer_ids_rate=50' --entity-name user1 --entity-type users

Default quotas for <user> can be configured by omitting entity name. For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'producer_ids_rate=200' --entity-type users

Known Limitations

  • As we are using BloomFilter we might get false positives.
  • Throttling based on User will punish any client is used by the same user. However, this is similar risk like existing quotas.

...