Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(TxnProducerIdentityGroupAssignment newIdentitygroupAssignment);
}

public interface classGroupAssignment TxnProducerIdentity {
	final Set<TopicPartition> inputPartitions;
    final String consumerGroupId;
	final int generationId();
}

	public TxnProducerIdentity(Set<TopicPartition> inputPartitions, String consumerGroupId, int generationIdinterface ConsumerAssignment extends GroupAssignment {
	Set<TopicPartition> partitions();
}

Here we introduced an intermediate data structure `TxnProducerIdentity` `GroupAssignment` just to make the evolvement easier in case we need to add more identification info during transaction init stage. There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

  • The first is that it is safe to call this API multiple times. In fact, it is required to be invoked after every consumer group rebalance or dynamic assignment. 
  • The second is that it is safe to call after receiving a `ProducerFencedException`. If a producer is fenced, all that is needed is to rejoin the associated consumer group and call this new `initTransactions` API.

The new thread producer API will highly couple with consumer group. We choose to define a new producer config `transactional.group.id` to pass in consumer group id:

Code Block
languagejava
titleProducerConfig.java
public static final String TRANSACTIONAL_GROUP_ID = "transactional.group.id";

And attempt to deprecate following APIs using consumer group id:





The key to this proposal is allowing a single transaction coordinator to see the assignments of all members in the group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

...