Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample.java
producer.initTransactions();
volatile boolean isRunning = true;

while (isRunning) {
	final boolean shouldCommit = 
	try {
    	producer.beginTransaction();
		producer.send(new ProducerRecord<>("topic", new byte[1], new byte[1]), (metadata, exception) -> {
			
		});
		return true;
 	} catch (KafkaException e) {
		return false;
 	}

	try {
        if (shouldCommit) {
            TransactionResult result = producer.commitTransaction();
            if (result.failed()) {
 				// Cleanup state, rewind processing offsets
				producer.abortTransaction();
			}
        } else {
			// Cleanup state, rewind processing offsets
            producer.abortTransaction();
        }
    } catch (KafkaException e) {
      // log the error
      throw e;
    }
}
}

producer.close();

...