Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that the current default transaction.timeout is set to one minute, which is too long for Kafka Streams EOS use cases. Considering the default commit interval was set to only 100 milliseconds, we would doom to hit session timeout if we don't actively commit offsets during that tight window. So we suggest to shrink the transaction timeout to be the same default value as session timeout (10 seconds) on Kafka Streams, to reduce the potential performance loss for offset fetch delay when some instances accidentally crash.

...

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams.

Offset Fetch Request 

We will add a new error code for consumer to wait for pending transaction clearance. In order to be able to return corresponding exceptions for old/new clients, we shall also bump the OffsetFetch protocol version.

...

In the meantime, this offset fetch back-off should be only applied to EOS use cases, not general offset fetch use case such as admin client access. we shall also define isolation level a flag within offset fetch request so that we only trigger back-off logic when the request is on isolation level read_committed.

Code Block
OffsetFetchRequest => Partitions GroupId IsolationLevel
  Partitions          => List<TopicPartition>
  GroupId             => String
  IsolationLevelWaitTransaction      => int8Boolean // NEW

Fence Zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of sync client.

To help get access to consumer state for txn producer, consumer will expose a new API for some of its internal states as an opaque struct:

Code Block
languagejava
// public
interface GroupMetadata {
}

// private
interface ConsumerGroupMetadata extends GroupMetadata {
  final String groupId;
  final int generationId;
  final String memberId;
  final Optional<String> groupInstanceId;
}

// Consumer API
public GroupMetadata groupMetadata();

Thus producer could poll the metadata as it needs during normal processing. 

A new generation.id field shall be added to the `TxnOffsetCommitRequest` request:. In the meantime we also suggest to add member.id and group.instance.id to the request to make the txn offset commit fencing consistent with normal offset fencing.

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32 // NEW
  MemberId			  => String // NEW
  GroupInstanceId 	  => String // NEW

If the generation.id is not matching group generation, the client will be fenced immediately. An edge case is defined as:

...