Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that the current default transaction.timeout is set to one minute, which is too long for Kafka Streams EOS use cases. It is because the default commit interval was set to 100 ms, and we would first hit exception session timeout if we don't actively commit offsets during that tight window. So we suggest to shrink the transaction timeout to be the same default value as session timeout (10 seconds), to reduce the potential performance loss for offset fetch delay.

...

Code Block
interface Producer {
  /**
   * This API shall be called for consumer group aware transactional producers.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer); // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);	 // NEW
}

Here we introduced an intermediate data structure `GroupAssignment` just to make the evolvement easier in case we need to add more identification info during transaction init stage. There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

...

The new thread producer API will highly couple with consumer group. We choose to define a new producer config `transactional.group.id` to pass in consumer group id:.

Code Block
languagejava
titleProducerConfig.java
public static final String TRANSACTIONAL_GROUP_ID = "transactional.group.id";

And attempt to deprecate sendOffset API which is using consumer group id:

...

languagejava
titleProducer.java

...

This config value will be automatically set to consumer group id on Kafka Streams app. For plain EOS user, you need to manually configure that id to the consumer group id if you are not following the recommend practice below by calling `initTransaction(consumer)`.

We could effectively unset the `transactional.id` config in Kafka Streams because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id when we rely on group membership. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId extend the InitProducerId API to support consumer group aware initialization.The schema for FindCoordinator does not need to change, but we need to add a new coordinator

typeBelow we provide the new InitProducerId schema:

Code Block
FindCoordinatorRequestInitProducerIdRequest => TransactionalId TransactionTimeoutMs CoordinatorKeyConsumerGroupId CoordinatorTypeAssignedPartitions
  CoordinatorKeyTransactionalId => STRINGNullableString
  CoordinatorTypeTransactionTimeoutMs => INT8Int64
 // 0TransactionalGroupId -=> ConsumerNullableString group coordinator, 1 -> Transaction coordinator, 2 -> Transaction "group" coordinator

Below we provide the new InitProducerId schema:

Code Block
InitProducerIdRequest // NEW

InitProducerIdResponse => TransactionalIdThrottleTimeMs TransactionTimeoutMsErrorCode ConsumerGroupIdProducerId AssignedPartitionsProducerEpoch
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  TransactionalGroupId => NullableString         // NEW

InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a consumer transactional group Id and a generation id. When a consumer . When a transactional group is provided, the transaction coordinator will honor consumer transactional group id and allocate a new producer.id every time initTransaction is called.  

Fencing zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.

In order to pass the group generationId to `initTransaction`, we need to expose it from the consumer. We propose to add a new function call on consumer to expose the generation info:

Code Block
languagejava
titleConsumer.java
public Generation generation();

With this proposal, the transactional id is no longer needed for proper fencing, but the coordinator still needs a way to identify producers individually as they are executing new transactions. There are two options: continue using the transactional id or use the producer id which is generated by the coordinator in InitProducerId. Either way, the main challenge is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we could instead utilize the consumer group id for authorization. The producer must still provide a transactional Id if it is working on standalone mode though.

We also need to change the on-disk format for transaction state in order to persist both the consumer group id and the assigned partitions. We propose to use a separate record type in order to store the group assignment. Transaction state records will not change.

Code Block
Key => GroupId TransactionalId
  GroupId => String
  TransactionalId => String

Value => GenerationId AssignedPartitions
  GenerationId => Int32
  AssignedPartitions => [Topic [Partition]]
    Topic => String
    Partition => Int32

To fence an old producer accessing group, we will return ILLEGAL_GENERATION exception. A new generation id field shall be added to the `TxnOffsetCommitRequest` request:

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32 // NEW

To be able to upgrade Kafka Streams application to leverage this new feature, a new config shall be introduced to control the producer upgrade decision:

Code Block
languagejava
titleStreamsConfig.java
public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to true

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Fencing for upgrades

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

Code Block
languagejava
titleErrors.java
PENDING_TRANSACTION(86, "There are pending transactions that need to be cleared before proceeding.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

Here a small optimization to help us reuse producer.id is to piggy-back consumer member.id as the transactional.id field during the first time rebalance and keep it unchanged. One more optimization is to use `group.instance.id` if it's set on the consumer side, which is guaranteed to be unique.

The main challenge when we choose a random transactional.id is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we could instead utilize the transactional.group.id(consumer group id) for authorization. The producer must still provide a transactional Id if it is working on standalone mode though.

Fencing zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.

A new generation id field shall be added to the `TxnOffsetCommitRequest` request:

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32 // NEW

If the generation.id is not matching group generation, the client will be fenced immediately. An edge case is defined as:

Code Block
1. Client A tries to commit offsets for topic partition P1, but haven't got the chance to do txn offset commit before a long GC.
2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 
3. Another client B was assigned with P1.
4. Client B doesn't see pending offsets because A hasn't committed anything, so it will proceed with potentially `pending` input data
5. Client A was back online, and continue trying to do txn commit. Here if we have generation.id, we will catch it!

We also need to apply a new on-disk format for transaction state in order to persist both the transactional group id and transactional id. We propose to use a separate txn key type in order to store the group assignment. Transaction state records will not change.

Code Block
Key => TransactionalId, TransactionalGroupId
  TransactionalId => String
  TransactionalGroupId => String

And here is a recommended new transactional API usage example:

Code Block
linenumberstrue
Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id as transactional.group.id
  producer.initTransactions(new GroupAssignment());	

  Generation generation = consumer.generation();
  consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onPartitionsAssigned(Collection<TopicPartition> partitions) {
		generation = consumer.generation();
    }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
	try {
	  if (generation == null) {
		throw new IllegalStateException("unexpected consumer state");
	  }
      producer.sendOffsetsToTransaction(consumedOffsets, generation.generationId);
	} catch (IllegalGenerationException e) {
	  throw e; // fail the zombie member if generation doesn't match
	}
	
    producer.commitTransaction();
  }

...