...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
final boolean shouldCommit;
try {
producer.beginTransaction();
// Do some processing and build the records we want to produce.
List<ProducerRecord> processed = process(consumed);
for (ProducerRecord record : processed)
producer.send(record, (metadata, exception) -> {
// not required to capture the exception here.
});
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
shouldCommit = true;
} catch (Exception e) {
// Catch any exception thrown from the data transmission phase.
shouldCommit = false;
}
try {
if (shouldCommit) {
producer.commitTransaction(); // new API with a returned flag.
} else {
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
}
} catch (CommitFailedException e) {
// Transaction commit failed with abortable error, user could reset
// the application state and resume with a new transaction. The root
// cause was wrapped in the thrown exception.
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
} catch (KafkaException e) {
producer.close();
consumer.close();
throw e;
}
} |
...
In the commit phase, we should decide whether to commit or abort transaction based on the previous stage result. In new Producer API, commitTransaction() will no longer throw non-fatal exceptions in their raw formats. Instead, it would try to wrap all non-fatal exceptions as `CommitFailedException` so that on the caller side it would be much easier to catch and handle. This means any exception other than `CommitFailedException` caught during the commit phase will be definitely fatal, so user's error handling experience could be simplified by just doing a controlled shutdown.
...