THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Aborts the ongoing transaction. Any unflushed produce messages will be aborted when this call is made. * This call will be a no-op if previous transactional call has thrown an exception and made the producer in an error state. * * Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. * It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction) * since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer. * * @throws TimeoutException if the time taken for aborting the transaction has surpassed <code>max.block.ms</code>. * @throws InterruptException if the thread is interrupted while blocked */ public void abortTransaction(); |
Suggested Coding Pattern
With this proposed change, users may use transactional APIs as follows.
Code Block | ||||
---|---|---|---|---|
| ||||
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
if(e instanceof IllegalStateException ||
e instanceof ProducerFencedException ||
e instanceof UnsupportedVersionException ||
e instanceof AuthorizationException) {
producer.close();
}
} |
Compatibility, Deprecation, and Migration Plan
...
The alternatives are rejected because they all require a bigger changechanges.