Table of Contents |
---|
Status
Current state: Accepted
Under Discussion thread: here
Discussion Voting thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
New AllocateProducerIds RPC to be used by brokers to request a new block of IDs from the controller. The use of this RPC in ZK mode is enabled by selecting an IBP of 3.0-IV0 or higher. This RPC is always used in KRaft mode.
Code Block |
---|
AllocateProducerIdsRequest => BrokerId BrokerEpoch BrokerId => int32 BrokerEpoch => int64 |
...
A new metadata record to be used by the controller in the KRaft metadata log
Code Block |
---|
ProducerIdRecordAllocateProducerIdsRecord => BrokerId BrokerEpoch ProducerIdEnd BrokerId => int32 BrokerEpoch => int64 ProducerIdEnd => int64 |
...
Proposed Changes
Controller
The In both ZK and KRaft modes, the controller will now be responsible for generating new blocks of IDs and recording this block to the KRaft persisting the latest generated block. In ZK mode, the controller will use the existing ZNode and JSON format for persistence. In KRaft mode, the controller will commit a new record to the metadata log. Since Since the controller (in either mode) uses a single threaded event model, we can simply calculate the next block of IDs based on what is currently in memory. The controller will need to commit a record to the metadata log persist generated PID block so it can be “consumed” and never used again.
It will be the responsibility of the controller to determine the PID block size. We will use a block size of 1000 like the current implementation does. We include the block start and length in the RPC to accommodate for possible changes to the block size in the future.
The ProducerIdRecord AllocateProducerIdsRecord will consume 20 bytes plus record overhead. Since the established upper bound on Producer ID is Long.MAX_VALUE, the required storage could theoretically be in the petabyte range (assuming 1000 IDs per block and no truncation of old records). However, in practice, we will truncate old metadata records and it is unlikely to see such excessive producer ID generation.
...
The broker will now defer to the controller for allocating blocks of PIDs. This is done by sending an AllocateProducerIdsRequest to the controller using an inter-broker channel. If the request fails, the broker will retry for certain transient errors. The broker should use the ID block returned by the RPC (as opposed to waiting to replicate the ProducerIdRecord AllocateProducerIdsRecord metadata record).
Since there is now a Kafka network request involved in the ID block generation, we should consider pre-fetching blocks so a client is never waiting on an InitProducerIdRequest for too long. We will likely share an existing broker-controller channel for this RPC, so we cannot guarantee low or consistent latencies.
...