Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

be

Table of Contents

Status

Current state: Under Discussion

...

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

	// --------- Call before process ---------- //
	// The application could infer output partition beforehand
	Set<TopicPartition> outputPartitions = outputPartition(consumed);
	producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId()));

    // Do processing while sending out the built records asynchronously
    for (ProducerRecord record : process(consumed))
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();

...

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

	// --------- Call after process ---------- //
	Set<TopicPartition> outputPartitions = outputPartitions(processed); 
    producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId()));

    // Write the records and commit offsets under a single transaction
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();

...