Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(TxnProducerIdentity newIdentity);
}

public class TxnProducerIdentity {
	final Set<TopicPartition> inputPartitions;
    final String consumerGroupId;
	final int generationId;

	public TxnProducerIdentity(Set<TopicPartition> inputPartitions, String consumerGroupId, int generationId);
}

Here we introduced an intermediate data structure `TxnProducerIdentity` just to make the evolvement easier in case we need to add more identification info during transaction init stage. There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

  • The first is that it is safe to call this API multiple times. In fact, it is required to be invoked after every consumer group rebalance or dynamic assignment
  • The second is that it is safe to call after receiving a `ProducerFencedException`. If a producer is fenced, all that is needed is to rejoin the associated consumer group and call this new `initTransactions` API.

...

Code Block
languagejava
titleStreamsConfig.java
public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to falsetrue

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread. Or alternatively, we could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use.

...

Code Block
linenumberstrue
  String groupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(new TxnProducerIdentity(partitions, consumerGroupId, generationId));
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

...

Besides an active fencing mechanism, we also need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. Since it would be an unknown exception for old consumers, we will choose to send a COORDINATOR_LOAD_IN_PROGRESS exception to let it retry. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just waiting transaction timeout for one minute which should be trivial cost during upgrade only. 

Rejected Alternatives

  • We discussed whether we want to have Have a new API to proactively abort ongoing transactions transactions:
  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:

...