Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue

  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer();
  producer.initTransactions(new GroupAssignment());	

  Generation generation = consumer.generation();
  consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onPartitionsAssigned(Collection<TopicPartition> partitions) {
		generation = consumerproducer.generationinitTransactions(consumer);	// Will access consumer internal state.
    }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
 // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
	try {
	     if (generation == nullproducer.sendOffsetsToTransaction(consumedOffsets);
	} catch (IllegalGenerationException e) {
		throw new IllegalStateException("unexpected consumer state");
	  }
      producer.sendOffsetsToTransaction(consumedOffsets, generation.generationId);
	} catch (IllegalGenerationException e) {
	  throw   throw e; // fail the zombie member if generation doesn't match
	}
	
     producer.commitTransaction();
  }

...

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` (IBP) to the latest. Any produce request with higher version will automatically get fenced because of no support. If this is For the case on a of Kafka Streams, we will apply an automated approach. Upon application , you will be recommended to unset `CONSUMER_GROUP_AWARE_TRANSACTION` config as necessary to just upgrade the client without using new thread producer.initialization, we use admin client to send out an ApiVersionRequest to bootstrap servers to know their current version. If all members' responses indicates that they do support 447 features, we shall initialize the application with thread producer when EOS is on; otherwise we shall use the old producer per task semantics. 

In a tricky case user attempts to downgrade, we shall throw exception and ask application user to restart their clients to use old API.

A minor thing to take care here is that we haven't implemented the ApiVersion request under KIP-117. We shall fulfill this API gap during the implementation.We need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. Since it would be an unknown exception for old consumers, we will choose to send a COORDINATOR_LOAD_IN_PROGRESS exception to let it retry. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just waiting transaction timeout for one minute which should be trivial one-time cost during upgrade only. 

Rejected Alternatives

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:
  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.

...