Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We argue that the root of the problem is twofold:

...

that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes.

...

Our proposal is to make transaction coordinators aware of consumer group assignments. Rather than distributing the transactional state by routing every transactional Id to a separate coordinator, we will use the consumer group id to identify a single transaction coordinator which is responsible for managing the state. This gives the transaction coordinator the ability to understand which partitions are being consumed by each member of the group. This can then be used to safely coordinate assignment changes.

We use  Currently transaction coordinator uses the initTransactions API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We will add a new initTransactions API which accepts the set of assigned partitions and the associated consumer group Id. This will be passed to the transaction coordinator in the InitProducerId call, and will be stored with the other transaction statepropose to switch this guarantee on to group coordinator.


Essentially the problem we are trying to solve is making the coordinator aware of the dependencies between processes that come as a result of partition reassignment. When handling the InitProducerId request, the coordinator will use the previous partition assignment of the consumer group to check which transactions need to be completed before it is safe to begin processing. The coordinator will then ensure that only one producer for each assigned partition is allowed to make progress at any time.

...

Code Block
interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(GroupAssignment groupAssignment initTransactions(KafkaConsumer<byte[], byte[]> consumer);
}

public interface GroupAssignment  {
	int generationId();
}

...

Code Block
languagejava
titleProducer.java
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); // DEPRECATED
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets); // NEW


We could effectively unset the `transactional.id` config because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id when we rely on group membership. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

...

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  ConsumerGroupIdTransactionalGroupId => NullableString         // NEW
  ConsumerGroupGeneration => Int32          // NEW

InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

...