...
This KIP aims to solve both problems by introducing a new RPC and a new method of generating Producer ID blocks using the metadata log as storage.
Public Interfaces
New AllocateProducerIdBlock RPC AllocateProducerId RPC to be used by brokers to request a new block of IDs from the controller
Code Block |
---|
AllocateProducerIdBlockRequestAllocateProducerIdRequest => BrokerId BrokerEpoch BrokerId => int32 BrokerEpoch => int64 |
Code Block |
---|
AllocateProducerIdBlockResponseAllocateProducerIdResponse => ErrorCode ProducerIdBlockStartProducerIdStart ProducerIdBlockEndProducerIdLen ErrorCode => int16 ProducerIdBlockStartProducerIdStart => int64 ProducerIdBlockEndProducerIdLen => int64int32 |
ProducerIdBlockStart and ProducerIdBlockEnd are ProducerIdStart is inclusive. E.g., if the controller generates 1000 IDs starting from zero, it will return ProducerIdBlockStartreturn ProducerIdStart=0 and ProducerIdBlockEnd=999and ProducerIdLen=1000 which represents IDs 0 through 999.
Possible errors could be:
...
An authorization error will be considered fatal and should cause the broker to terminate. This indicates the broker is incorrectly configured to communicate with the controller. All other error types should be treated as transient and the broker should retry.
The AllocateProducerIdBlock AllocateProducerId RPC should be rate limited using the existing throttling mechanism. This will help guard against malicious or malfunctioning clients.
...
A new metadata record to be used by the controller in the KRaft metadata log
Code Block |
---|
ProducerIdBlockRecordProducerIdRecord => BrokerId BrokerEpoch ProducerIdBlockEndProducerIdEnd BrokerId => int32 BrokerEpoch => int64 ProducerIdBlockEndProducerIdEnd => int64 |
Proposed Changes
Controller
...
It will be the responsibility of the controller to determine the PID block size. We will use a block size of 1000 like the current implementation does. We include the block start and length in the RPC to accommodate for possible changes to the block size in the future.
The ProducerIdBlockRecord ProducerIdRecord will consume 20 bytes plus record overhead. Since the established upper bound on Producer ID is Long.MAX_VALUE, the required storage could theoretically be in the petabyte range (assuming 1000 IDs per block and no truncation of old records). However, in practice, we will truncate old metadata records and it is unlikely to see such excessive producer ID generation.
...
The broker will now defer to the controller for allocating blocks of PIDs. This is done by sending an AllocateProducerIdBlockRequest AllocateProducerIdRequest to the controller using an inter-broker channel. If the request fails, the broker will retry for certain transient errors. The broker should use the ID block returned by the RPC (as opposed to waiting to replicate the ProducerIdBlockRecord ProducerIdRecord metadata record).
Since there is now a Kafka network request involved in the ID block generation, we should consider pre-fetching blocks so a client is never waiting on an InitProducerIdRequest for too long. We will likely share an existing broker-controller channel for this RPC, so we cannot guarantee low or consistent latencies.
...