THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
be
Table of Contents |
---|
Status
Current state: Under Discussion
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// --------- Call before process ---------- //
// The application could infer output partition beforehand
Set<TopicPartition> outputPartitions = outputPartition(consumed);
producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId()));
// Do processing while sending out the built records asynchronously
for (ProducerRecord record : process(consumed))
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
...
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// --------- Call after process ---------- //
Set<TopicPartition> outputPartitions = outputPartitions(processed);
producer.beginTransaction(outputPartitions, Optional.of(consumer.groupMetadata().groupId()));
// Write the records and commit offsets under a single transaction
for (ProducerRecord record : processed)
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
...