THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); producer.initTransactions(); volatile boolean isRunning = true; while (isRunning) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); final boolean shouldCommit = ; try { producer.beginTransaction(); // Do some processing and build the records we want to produce. List<ProducerRecord> processed = process(consumed); for (ProducerRecord record : processed) producer.send(record, (metadata, exception) -> { // not required to capture the exception here. }); producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); returnshouldCommit = true; } catch (Exception e) { // Catch any exception thrown from the data transmission phase. returnshouldCommit = false; } try { if (shouldCommit) { producer.commitTransaction(); } else { resetToLastCommittedPositions(consumer); producer.abortTransaction(); } } catch (TimeoutException e) { // the transaction state could be reset when the original commit/abort times out. // This is a best-effort demo approach to avoid a producer shutdown, // if abort times out again, the timeout exception will be thrown // to the application layer. The total retry time would be 2 * max.block.ms resetToLastCommittedPositions(consumer); producer.abortTransaction(); } catch (KafkaException e) { producer.close(); consumer.close(); throw e; } } |
...