Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka transaction uses a two-phase model, where the transaction state is stored by transaction coordinator. As of current, the way to make it work is through AddPartitionToTxn call each time we see a new output partition. In reality though, the set of output partitions for a producer doesn't change very often, and even deterministic over timeat the transaction beginning, so it is a waste of network round trips to update that set unnecessarily based on witnessing of new output records.

...

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

	// --------- Call before process ---------- //
	// The application could infer output partition beforehand
	Set<TopicPartition> outputPartitions = outputPartition(consumed);
	producer.beginTransaction(outputPartitions);

    // Do someprocessing processingwhile andsending buildout the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transactionbuilt records asynchronously
    for (ProducerRecord record : processedprocess(consumed))
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();

...