Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId))new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();
 
  // Will access consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onPartitionsAssigned(Collection<TopicPartition> partitions) {
	 internal state. Only called once in the app's life cycle after first rebalance.
  // WillNote accessthat consumerthis internalis state.a Only called once in the app's life cycle after first rebalance.
	blocking call until consumer successfully joins the group.
  producer.initTransactions(consumer);	
    }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
	try {
      producer.sendOffsetsToTransaction(consumedOffsets);
	} catch (IllegalGenerationException e) {
	  throw e; // fail the zombie member if generation doesn't match
	}
    producer.commitTransaction();
  }

The main points are the following:

  1. Consumer group id becomes a config value on producerProducer calling new initTransaction will be blocked until consumer successfully joins the group and get valid generation for the first time.
  2. Generation.id will be used for group coordinator fencing.
  3. We no longer need to close the producer after a rebalance.

...