THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); producer.initTransactions(); volatile boolean isRunning = true; while (isRunning) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); final boolean shouldCommit = try { producer.beginTransaction(); // Do some processing and build the records we want to produce. List<ProducerRecord> processed = process(consumed); for (ProducerRecord record : processed) producer.send(record, (metadata, exception) -> { // not required to capture the exception here. }); producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); return true; } catch (KafkaExceptionException e) { // Catch any exception thrown from the data transmission phase. return false; } try { if (shouldCommit) { TransactionCommitResult result = producer.commitTransaction(); if (result.failed()) { resetToLastCommittedPositions(consumer); producer.abortTransaction(); } } else { resetToLastCommittedPositions(consumer); producer.abortTransaction(); } } catch (KafkaException e) { // log the error if (exception.cause() != null || e instanceOf TimeoutException) { // thrown exception is not fatal and the transaction state could be reset. resetToLastCommittedPositions(consumer); producer.abortTransaction(); } else { throw e; } } } } producer.close(); |
In the above example, we separate the transactional processing into two phases: the data transmission phase, and the commit phase. In data transmission phase,
...