Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Producer {
  /**
   * This API shall be called for consumer group aware transactional producers.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer); // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW
}

The new thread producer API will highly couple with consumer group. We choose to define a new producer config `transactional.group.id`.

Code Block
languagejava
titleProducerConfig.java
public static final String TRANSACTIONAL_GROUP_ID = "transactional.group.id";

This config value will be automatically set to consumer group id on Kafka Streams app. For plain EOS user, you need to manually configure that id to the consumer group id if you are not following the recommended practice below by calling `initTransaction(consumer)`.

We could effectively unset the `transactional.id` config in Kafka Streams because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id when we rely on group membership. To enable this, we need to extend the InitProducerId API to support consumer group aware initialization.

Below we provide the new InitProducerId schema:

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  TransactionalGroupId => NullableString         // NEW

InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a transactional group Id. When a transactional group is provided, the transaction coordinator will honor transactional group id and allocate a new producer.id every time initTransaction is called. Here a small optimization to help us reuse producer.id is to piggy-back consumer member.id as the transactional.id field during the first time rebalance and keep it unchanged. One more optimization is to use `group.instance.id` if it's set on the consumer side, which is guaranteed to be unique.

The main challenge when we choose a random transactional.id is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we could instead utilize the transactional.group.id(consumer group id) for authorization. The producer must still provide a transactional Id if it is working on standalone mode though.

Fencing zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.

...

Code Block
1. Client A tries to commit offsets for topic partition P1, but haven't got the chance to do txn offset commit before a long GC.
2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 
3. Another client B was assigned with P1.
4. Client B doesn't see pending offsets because A hasn't committed anything, so it will proceed with potentially `pending` input data
5. Client A was back online, and continue trying to do txn commit. Here if we have generation.id, we will catch it!

We also need to apply a new on-disk format for transaction state in order to persist both the transactional group id and transactional id. We propose to use a separate txn key type in order to store the group assignment. Transaction state records will not change.

Code Block
Key => TransactionalId, TransactionalGroupId
  TransactionalId => String
  TransactionalGroupId => String

And here is a recommended new transactional API usage example:

...

  1. User must be utilizing both consumer and producer as a complete EOS application,
  2. User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
  3. Need to call the new producer.initTransactions(consumer); which passes in a consumer struct for state access during initialization,
  4. Producer no longer needs to call sendOffsetsToTransaction(offsets, consumerGroupId) because we will be able to access consumer group id internally. Instead just pass offsets as single parameter. 

Compatibility, Deprecation, and Migration Plan

...