Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
  String consumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig(groupId)); // passing in consumer group id as transactional.group.id
  producer.initTransactions(new GroupAssignment());	

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
    void onPartitionsAssigned(Collection<TopicPartition> partitions) {
		// some operation
    }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    try {
		producer.sendOffsetsToTransaction(consumedOffsets, consumer.generation().generationId);
	} catch (IllegalGenerationException e) {
		throw e; // fail the zombie member if generation doesn't match
	}
	
    producer.commitTransaction();
  }

...