You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

Zookeeper

Add znode /metadata_epoch with the following json format

{
  "version" : int32,
  "metadata_epoch" : int64
}

Request/Response protocol

1) Add metadata_epoch field to UpdateMetadataRequest

UpdateMetadataRequest => controller_id controller_epoch metadata_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  metadata_epoch => int64   <-- New
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]

2) Add metadata_epoch field to MetadataResponse

MetadataResponse => throttle_time_ms brokers cluster_id controller_id metadata_epoch topic_metadata
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  metadata_epoch => int64   <-- New
  topic_metadata => [TopicMetadata]


3) Add metadata_epoch field to OffsetCommitRequest

OffsetCommitRequest => group_id generation_id memeber_id retention_time topics
  group_id => str
  generation_id => int32
  member_id => str
  retention_time => int64
  topics => [OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic partitions
  topic => str
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset metadata_epoch metadata
  partition => int32
  offset => int64
  metadata_epoch => int64      <-- NEW
  metadata => nullable_str

 

4) Add metadata_epoch field to OffsetFetchResponse

OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset metadata_epoch metadata error_code
  partition => int32
  offset => int64
  metadata_epoch => int64            <-- NEW
  metadata => nullable_str
  error_code => int16
 

 

Offset topic schema

Add metadata_epoch field to the schema of the offset topic value.

OFFSET_COMMIT_VALUE_SCHEMA => offset metadata_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  metadata_epoch => int64     <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64


Consumer API

1) Add error INVALID_METADATA_EPOCH. This is a non-retriable error and it may be thrown from consumer's API.

2) Add the following two methods to the interface org.apache.kafka.clients.consumer.Consumer

// This method returns the current metadata_epoch in the Consumer. It will be -1 if consumer has never received any MetadataResponse.
public long metadataEpoch()
 
// This method will block until either the timeout has reached or the consumer has received a MetadataResponse whose metadata_epoch >= minMetadataEpoch
public boolean waitForMetadataUpdate(long minMetadataEpoch, long timeout)

Proposed Changes

1) Controller metadata update

Every time controller sends UpdateMetadataRequest, the controller will read the metadata_epoch in the znode /metadata_epoch by 1 and includes the new value in UpdateMetadataRequest

2) Client's Metadata refresh

After client receives MetadataResponse from a broker, it compares metadata_epoch in the MetadataResponse with the metadata_epoch in the last-accepted MetadataResponse. Client will consider the MetadataResponse as outdated and re-fetch Metadata again with the exiting backoff mechanism if the metadata_epoch in the MetadataResponse is smaller than the metadata_epoch in the last-accepted MetadataResponse.

3) Offset commit

When consumer commits offset, it includes the metadata_epoch from the last-accepted MetadataResponse in the OffsetCommitRequest.

When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the metadata_epoch in the request >= metadata_epoch in the last commit. If not, the offset for this partition is not committed and the error for that partition in the OffsetCommitResponse will be INVALID_METADATA_EPOCH. If yes, both the offset and the metadata_epoch for this partition will be written to the offset topic.

4) Offset fetch

After consumer receives OffsetFetchResponse, it remembers the metadata_epoch for each partition it needs to consume. Then the consumer needs to refresh metadata repeatedly until the metadata_epoch from MetadataResponse >= the metadata_epoch for all partitions in the OffsetFetchRespons.

For existing version of the offset topic, metadata_epoch will not be available in the value of the offset topic message. We will use metadata_epoch = -1 to indicate the missing metadata_epoch. In this case metadata_epoch in any MetadataResponse will be larger than the metadata_epoch = -1 and the consumer behavior will be the same as it is now.

 

5) Consumer initialization if offset is stored externally.

After getting the offset to be stored externally, user needs to additionally use the newly-added API metadataEpoch() to get the metadata_epoch in the consumer. This metadata_epoch should be stored externally as well. Before the next consumer seeks to the offset that is stored externally, this consumer should first read the metadata_epoch from the external store and call waitForMetadataUpdate() to make sure that the consumer receives MetadataResponse whose metadata_epoch >= previously-stored metadata_epoch.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

 

 

 

  • No labels