THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
producer.initTransactions();
volatile boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
final boolean shouldCommit;
try {
producer.beginTransaction();
// Do some processing and build the records we want to produce.
List<ProducerRecord> processed = process(consumed);
for (ProducerRecord record : processed)
producer.send(record, (metadata, exception) -> {
// not required to capture the exception here.
});
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
shouldCommit = true;
} catch (Exception e) {
// Catch any exception thrown from the data transmission phase.
shouldCommit = false;
}
try {
final boolean commitSucceed;
if (shouldCommit) {
commitSucceed = producer.commitTransaction(); // new API with a returned flag.
} else {
commitSucceed = false;
}
if (commitSucceed) {
// Commit to external storage if necessary
} else {
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
}
} catch (TimeoutException e) {
// the transaction state could be reset when the original commit/abort times out.
// This is a best-effort demo approach to avoid a producer shutdown,
// if abort times out again, the timeout exception will be thrown
// to the application layer. The total retry time would be 2 * max.block.ms
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
} catch (KafkaException e) {
producer.close();
consumer.close();
throw e;
}
} |
...