Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
  String groupIdconsumerGroupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(new TxnProducerIdentity(partitions, consumerGroupId, generationId));
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

...