Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • /config/users/<user>
  • /config/users/<default>

 ProducerIdQuotaManager

  • will

...

  • be applied per KafkaPrincipal as it's a smaller subset which is known to the administrator of the cluster. It will not be applied to ClientId (which is not enforced by the client config) nor a combination of KafkaPrincipal and ClientId.
  • will keep a cache of user (KafkaPrincipal) to unique active PIDs to track active PIDs. The cache is used only to decide if we encountered PID before or not for a given KafkaPrincipal and not as a counter. The cache will be implemented using a simple bloom filter controlled by time to avoid any unwanted growth that might cause OOM. (More details on this is explained in the next section) 
  • add rating metrics which will increment check if the caching layer contains the PID for this given user or not.
    • If cache layer doesn't contain the PID then the quota manager will add the PID for this user to the cache and increment quota rating metrics.
    • If the cache layer contains the PID then there is no need to update the cache or increment the quota rating metrics. 
  • will throttle users once it . And user will be throttled once we reach the allowed quota.  The quota manager will throw QuotaValidationException  similar to existing quotas in Kafka. And the client will receive ProduceResponse with throttleTimeMs similar to throttling of bandwidth or request. 

Caching layer to Caching layer to track active PIDs per KafkaPrincipal

The cache will be represented as a map of KafkaPrincipal to timed controlled bloom filter (TimedBloomFilter). User's PIDs in the bloom filter in the caching layer will go through the filling steps in its lifecycle:

  • Step1: Adding the first PID for user will create a bloom filter for this user ( let's call this user userA) will create an entry to this user in the cache with its first bloom filter in the cached map (let call it bloom_filter_1)
    • Any new PIDs will be added to this cache for the first half of producer.id.quota.window.size.seconds 
    Step2: A new bloom filter will be created along side the old one for the user for the second
    • Now the cache layer is storing the following entry for the user 

      Code Block
      languageactionscript3
      Map { "UserA" -> TimedBloomFilter {
      					 bloom_filter_1_create_timestamp -> bloom_filter_1
      					}
      	}


    • Any new PIDs will be added to this cache for the first half of producer.id.quota.window.size.seconds
    (let's call it bloom_filter_2).
    • All new PIDs from this point will be added to the new filter. 
    • Both bloom filters will be used to check if we came across the same PID before or not. 
  • Step3: The old bloom filter (bloom_filter_1) will be disposed once we reach  producer.id.quota.window.size.seconds .
    •  
  • Step2: A new bloom filter will be created along side the old one for the user for the second half of producer.id.quota.window.size.seconds (let's call it bloom_filter_2).
    • All new PIDs from this point will be added to the new filter. 
    • Both bloom filters will be used to check if we came across the same PID before or not. 
    • Now the cache layer is storing the following entry for the user

      Code Block
      languageactionscript3
      Map { "UserA" -> TimedBloomFilter {
      					 bloom_filter_1_create_timestamp -> bloom_filter_1,
       					 bloom_filter_2_create_timestamp -> bloom_filter_2
      					}
      	}


  • Step3: The old bloom filter (bloom_filter_1) will be disposed once we reach  producer.id.quota.window.size.seconds .
    • Now the cache layer is storing the following entry for the user 

      Code Block
      languageactionscript3
      Map { "UserA" -> TimedBloomFilter {
       					 bloom_filter_2_create_timestamp -> bloom_filter_2
      					}
      	}


  • Step4: Repeat steps 2, and 3 on bloom_filter_2. 
    • Now the cache layer is storing the following entry for the user 

      Code Block
      languageactionscript3
      Map { "UserA" -> TimedBloomFilter {
      					 bloom_filter_2_create_timestamp -> bloom_filter_2,
       					 bloom_filter_3_create_timestamp -> bloom_filter_3
      					}
      	}
    Step4: Repeat steps 2, and 3 on bloom_filter_2. 
    • Image Modified


This way each user within a window will have 2 bloom filters (both are available for read while only one of them active for write) and when we need to check if we came across PID before or not for a given user we will check if any of the bloom filters contains the PID. 

...

NameTypeDefaultDescription
producer.id.quota.window.num Int11

The number of samples to retain in memory for alter producer id quotas

producer.id.quota.window.size.seconds Int3600

The time span of each sample for producer id quotas. Default is 1hr.

producer.id.quota.cache.cleanup.scheduler.interval.ms 

Int10

The frequency in ms that the producer id quota manager will check for disposed cached window.

quota.window.num  and quota.window.size.seconds  are similar to the rest of Kafka Quota Configs.

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

...

The new quota type will use QuotaViolationException similar to ClientQuotaManager  similar to ClientQuotaManager. And the client will receive ProduceResponse with throttleTimeMs similar to throttling of bandwidth or request.



New TimeControlledBloomFilter

...

Code Block
languagescala
class ProducerIdQuotaManager[K, V](disposalSchedulerIntervalMs: Long, cleanupScheduler: Scheduler) {
	protected val concurrentMap: ConcurrentHashMap[K
KafkaPrincipal, TimedBloomFilter[V]] = new ConcurrentHashMap()    

	protect val schedulerIntervalMs = new AtomicLong(disposalSchedulerIntervalMs)

    cleanupScheduler.schedule("cleanup-keys", () => {
			// Cleanup Keys that have empty TimedBloomFilter
    }, 0L, schedulerIntervalMs)

  	
 	def disposalSchedulerIntervalMs(intervalMs: Long): Unit = {      
		disposalSchedulerIntervalMs(intervalMs)
  	}
	
	def add(key: K, value: V): ControlledCachedMap[K, V] = {
		// Add value to the key bloom filter
	}

    def containsKeyValuePair(key: K, value: V): Boolean = {
		// Check if key, value exist in the cache
	}
}

...

  • As we are using BloomFilter we might get false positives.
  • Throttling based on User will punish any client is used by the same user. However, this is similar risk like existing quotas that got applied only on KafkaPrincipal level.
  • Similar to other Quotas in Kafka, all throttling is against individual brokers. Which means if leadership changed the new leader will start throttle from zero if it never had this KafkaPrincipal producing to it before
  • Some producers might get throttled for long time depends on the configuration instead of crashing. Which may go unnoticed for some producers specially if they don't alert on the throttle or other metrics to get notified when the producer stopped producing

Compatibility, Deprecation, and Migration Plan

...

    1. INIT_PRODUCER_ID for idempotent producer request PIDs from random controller every time so if a client got throttled on one controller doesn't guarantee it will not go through on next controller causing OOM at the leader later.
    2. The problem happened on the activation of the PID when it produce and not at the initialisation. Which means Kafka wouldn't have OOM problem if the producer got assigned PID but crashed before producing anything.
    3. Throttling producers that crash between initialisation and producing could slow them down when they recover/fix the problem that caused them to crash right after initialising PID. 

5. Throttle PIDs based on IPs

...