THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
producer.initTransactions(); volatile boolean isRunning = true; while (isRunning) { final boolean shouldCommit = try { producer.beginTransaction(); producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1])); return true; } catch (KafkaException e) { return false; } try { if (shouldCommit) { TransactionResult result = producer.commitTransaction(); if (result.failed()) { // Cleanup state, rewind processing offsets producer.abortTransaction(); } } else { // Cleanup state, rewind processing offsets producer.abortTransaction(); } } catch (KafkaException e) { // log the error throw e; } } } producer.close(); |
...