Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();
 
  // Will access consumer internal state. Only called once in the app's life cycle after first rebalance.
  // Note that this is a blocking call until consumer successfully joins the group.
  producer.initTransactions(consumer);	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
	try {
      producer.sendOffsetsToTransaction(consumedOffsets);
	} catch (IllegalGenerationException e) {
	  throw e; // fail the zombie member if generation doesn't match
	}
    producer.commitTransaction();
  }

...