You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Draft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-98 introduced the InitProducerId RPC (originally InitPid). The purpose of this RPC is to provide unique PIDs (Producer IDs) to producer clients which are transactional or idempotent (also referred to as EOS – Exactly Once Semantics). These unique PIDs are written into the header of a record batch along with other fields (producer epoch, sequence number, etc) which are used for transactions and EOS.

The current implementation of PID generation uses a block generation scheme utilizing ZooKeeper for persistence and concurrency control. Each time a broker needs to allocate a new block of PIDs, it will use ZooKeeper’s conditional setData API to allocate the next block:

while not done {
  data, version = read_pid_znode()
  data = allocated_new_block()
  done = write_pid_znode_if_version_match( data, version )
}

This ensures that a given block of PIDs will only be allocated once across the whole cluster. The block is not persisted anywhere by the broker and only lives in the broker’s memory. Once a broker has successfully generated a block of PIDs, that block is considered volatile and will be lost if the broker is restarted. This is not considered a problem since a broker will simply generate a new block upon restart and the PID space is quite large.

With KIP-500, brokers which are running the bridge release version will no longer have direct access to ZooKeeper, so we must abstract this access behind the controller.

With KIP-631, the controller no longer uses ZooKeeper for persistence and instead uses the KRaft metadata log. 

This KIP aims to solve both problems by introducing a new RPC and a new method of generating Producer ID blocks using the metadata log as storage.

Public Interfaces

New AllocateProducerIdBlock RPC to be used by brokers to request a new block of IDs from the controller

AllocateProducerIdBlockRequest => BrokerId BrokerEpoch
  BrokerId => int32
  BrokerEpoch => int64


AllocateProducerIdBlockResponse => ErrorCode BrokerId BrokerEpoch
                                   ProducerIdBlockStart ProducerIdBlockEnd
  ErrorCode => int16
  BrokerId => int32
  BrokerEpoch => int64
  ProducerIdBlockStart => int64
  ProducerIdBlockEnd => int64 


ProducerIdBlockStart and ProducerIdBlockEnd are inclusive. E.g., if the controller generates 1000 IDs starting from zero, it will return ProducerIdBlockStart=0 and ProducerIdBlockEnd=999

Possible errors could be:

  • ClusterAuthorizationFailed – a client with insufficient privileges tried to make this request

  • StaleBrokerEpoch – the given broker id and epoch do not match what the controller currently has

  • UnknownServerError – some unexpected error occurred on the controller

An authorization error will be considered fatal and should cause the broker to terminate. This indicates the broker is incorrectly configured to communicate with the controller. All other error types should be treated as transient and the broker should retry.

The AllocateProducerIdBlock RPC should be rate limited using the existing throttling mechanism. This will help guard against malicious or malfunctioning clients.


A new metadata record to be used by the controller in the KRaft metadata log

ProducerIdBlockRecord => BrokerId BrokerEpoch ProducerIdBlockEnd
  BrokerId => int32
  BrokerEpoch => int64
  ProducerIdBlockEnd => int64


Proposed Changes

Controller

The controller will be responsible for generating new blocks of IDs and recording this block to the KRaft metadata log. Since the controller uses a single threaded event model, we can simply calculate the next block of IDs based on what is currently in memory. The controller will need to commit a record to the metadata log so it can be “consumed” and never used again.

It will be the responsibility of the controller to determine the PID block size. We will use a block size of 1000 like the current implementation does. We include the block start and length in the RPC to accommodate for possible changes to the block size in the future.

The ProducerIdBlockRecord will consume 20 bytes plus record overhead. Since the established upper bound on Producer ID is Long.MAX_VALUE, the required storage could theoretically be in the petabyte range (assuming 1000 IDs per block and no truncation of old records). However, in practice, we will truncate old metadata records and it is unlikely to see such excessive producer ID generation.

Since the RPC will be rate limited, we should be able to avoid cases of excessive metadata generation or PID block exhaustion.

One favorable side effect of using metadata records is that we will effectively have an audit log of producer ID blocks and which brokers requested them. This could be useful in some debugging situations and may warrant some API or tool to expose this history, but that goes beyond the scope of this KIP.

Snapshots

Metadata snapshots will need to include the the latest Producer ID block that was committed to the metadata log. Since we only need the latest block, the impact on the size of the snapshots is trivial.

Broker

The broker will now defer to the controller for allocating blocks of PIDs. This is done by sending an AllocateProducerIdBlockRequest to the controller using an inter-broker channel. If the request fails, the broker will retry for certain transient errors. The broker should use the ID block returned by the RPC (as opposed to waiting to replicate the ProducerIdBlockRecord metadata record).

Since there is now a Kafka network request involved in the ID block generation, we should consider pre-fetching blocks so a client is never waiting on an InitProducerIdRequest for too long. We will likely share an existing broker-controller channel for this RPC, so we cannot guarantee low or consistent latencies.


Compatibility, Deprecation, and Migration Plan

Bridge Release

For the Kafka 3.x release that eventually becomes our KRaft “bridge release”, we will need to support the existing ZooKeeper ID-generation along with the new KRaft approach. Since the ZooKeeper approach uses the conditional update API in ZooKeeper, it is actually safe to have some brokers directly talking to ZooKeeper while others are using the new RPC to have the controller update ZooKeeper (as would happen during a rolling upgrade).

All brokers would need to be using the new RPC before upgrading to KRaft mode, which means clusters will need to upgrade to the bridge release before changing to KRaft mode. 

See also: Bridge Release section of KIP-500

Upgrade from ZK

In order to upgrade from the ZK-based ID block generation, we will need to ensure that the ID blocks generated by the quorum controller do not overlap with those previously generated by ZK. This can be done by reading the latest producer ID block from ZK and generating an equivalent record in the metadata log. This will need to be incorporated into the overall KRaft upgrade plan once that is available.


Rejected Alternatives

One alternative would be to attempt to pre-allocate IDs in an uncoordinated manner using the broker ID. A scheme like this could probably be made to work, but might be hard to reconcile with previously ZK-generated ID ranges. It also means that some care would need to be taken to account for brokers that might be added in the future. This would also require that the brokers use some durable storage for their own ID ranges. Using the controller is not much more complicated than an uncoordinated approach (possibly its even simpler), and using the controller makes it easier to ensure correctness.


  • No labels