Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaProducer.java
/**
* Aborts the ongoing transaction. Any unflushed produce messages will be aborted when this call is made.
* This call will be a no-op if previous transactional call has thrown an exception and made the producer in an error state.
*
* Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration
* of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
* It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction)
* since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer.
*
* @throws TimeoutException if the time taken for aborting the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void abortTransaction();

Suggested Coding Pattern

With this proposed change, users may use transactional APIs as follows.

Code Block
languagejava
titleExample.java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    if(e instanceof IllegalStateException || 
        e instanceof ProducerFencedException ||
	    e instanceof UnsupportedVersionException || 
        e instanceof AuthorizationException) {
        producer.close();
    }
}


Compatibility, Deprecation, and Migration Plan

...

The alternatives are rejected because they all require a bigger changechanges.