...
Kafka transaction uses a two-phase model, where the transaction state is stored by transaction coordinator. As of current, the way to make it work is through AddPartitionToTxn call each time we see a new output partition. In reality though, the set of output partitions for a producer doesn't change very often, and even deterministic over timeat the transaction beginning, so it is a waste of network round trips to update that set unnecessarily based on witnessing of new output records.
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed); // --------- Call before process ---------- // // The application could infer output partition beforehand Set<TopicPartition> outputPartitions = outputPartition(consumed); producer.beginTransaction(outputPartitions); // Do someprocessing processingwhile andsending buildout the records we want to produce List<ProducerRecord> processed = process(consumed); // Write the records and commit offsets under a single transactionbuilt records asynchronously for (ProducerRecord record : processedprocess(consumed)) producer.send(record); // Pass the entire consumer group metadata producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); producer.commitTransaction(); |
...