Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(GroupAssignment groupAssignment);
}

public interface GroupAssignment  {
	int generationId();
}

public interface ConsumerAssignment extends GroupAssignment {
	Set<TopicPartition> partitions();
}

Here we introduced an intermediate data structure `GroupAssignment` just to make the evolvement easier in case we need to add more identification info during transaction init stage. There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

...

The key to this proposal is allowing a single transaction coordinator to see the assignments of all members working in the same application group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

...

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  ConsumerGroupId => NullableString         // NEW
  ConsumerGroupGeneration => Int32          // NEW
  AssignedPartitions
InitProducerIdResponse => [Topic [Partition]] // NEWThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs  Topic => StringInt64
   ErrorCode Partition => Int32

InitProducerIdResponse => ThrottleTimeMs ErrorCodeInt16
  ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and the set of assigned partitions. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.

...

Code Block
Key => GroupId TransactionalId
  GroupId => String
  TransactionalId => String

Value => GenerationId AssignedPartitions
  GenerationId => Int32
  AssignedPartitions => [Topic [Partition]]
    Topic => String
    Partition => Int32

To be able to upgrade Kafka Streams application to leverage this new feature, a new config shall be introduced to control the producer upgrade decisionfence an old producer accessing the same topic partition, we will introduce a new exception type:

Code Block
languagejava
titleStreamsConfigErrors.java
public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to true

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Fencing for upgrades

To fence an old producer accessing the same topic partition, we will introduce a new exception type:

Code Block
languagejava
titleErrors.java
CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new),

Also we need to make sure we fence `old producer` instead of new ones, so a new generation id field shall be added to the `TxnOffsetCommitRequest` request:

CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new),

A new generation id field shall be added to the `TxnOffsetCommitRequest` request:

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             
Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32 // NEW


And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back offTo be able to upgrade Kafka Streams application to leverage this new feature, a new config shall be introduced to control the producer upgrade decision:

Code Block
languagejava
titleErrorsStreamsConfig.java
PENDING_TRANSACTION(86, "Could not consume from this topic partition due to pending transactions going on.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to true

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Fencing for upgrades

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

Code Block
languagejava
titleErrors.java
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order 
Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(new TxnProducerIdentity(partitions, consumerGroupId, generationId));
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

...

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` to the latest. Any produce request with higher version will automatically get fenced because of no support. If this is the case on a Kafka Streams application, you will be recommended to unset `CONSUMER_GROUP_AWARE_TRANSACTION` config as necessary to just upgrade the client without using new thread producer.To make the upgrade completely compatible with current EOS transaction semantics, we need to be able to distinguish clients who are making progress on the same input source but using different transactional id. It is possible to have two different types of clients within the same consumer group. Imagine a case as a Kafka Streams applications, where half of the instances are using old task producer API, while the other half of them use new consumer group API. This fencing could be done by leveraging transactional offset commit protocol which contains a consumer group id and topic partitions. Group coordinator could build a reverse mapping from topic partition to [producer.id, generationId], and remember which producer has been contributing offset commits to each specific topic partition. In this way, when we upgrade to the new API, group coordinator will be actively checking this map upon receiving `TxnOffsetCommit`. If the stored `producer.id` doesn't match the one defined in request, and the generation id in the request field is either undefined or smaller than the one currently stored, coordinator would send out a `ConcurrentProducerCommitException` in the response to shutdown this conflict producer immediately. This ensures us a smooth upgrade without worrying about old pending transactions. Also this suggests that it is not recommended to have two types of clients running in the same application which makes the fencing much harder.is the case on a Kafka Streams application, you will be recommended to unset `CONSUMER_GROUP_AWARE_TRANSACTION` config as necessary to just upgrade the client without using new thread producer.

We Besides an active fencing mechanism, we also need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. Since it would be an unknown exception for old consumers, we will choose to send a COORDINATOR_LOAD_IN_PROGRESS exception to let it retry. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just waiting transaction timeout for one minute which should be trivial one-time cost during upgrade only. 

...

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:
  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.