THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
producer.initTransactions();
volatile boolean isRunning = true;
while (isRunning) {
final boolean shouldCommit =
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]), (metadata, exception) -> {
});
return true;
} catch (KafkaException e) {
return false;
}
try {
if (shouldCommit) {
TransactionResult result = producer.commitTransaction();
if (result.failed()) {
// Cleanup state, rewind processing offsets
producer.abortTransaction();
}
} else {
// Cleanup state, rewind processing offsets
producer.abortTransaction();
}
} catch (KafkaException e) {
// log the error
throw e;
}
}
}
producer.close(); |
...